Workflows & Activities

Workflows & Activities

This guide covers the basics of creating workflows and activities in Romancy.

DefineWorkflow

DefineWorkflow creates a workflow orchestrator function.

Basic Usage

package main

import (
	"github.com/i2y/romancy"
)

type MyInput struct {
	Param1 string `json:"param1"`
	Param2 int    `json:"param2"`
}

type MyResult struct {
	Result string `json:"result"`
	Param2 int    `json:"param2"`
}

var myWorkflow = romancy.DefineWorkflow("my_workflow",
	func(ctx *romancy.WorkflowContext, input MyInput) (MyResult, error) {
		// Orchestration logic here
		result, err := someActivity.Execute(ctx, input.Param1)
		if err != nil {
			return MyResult{}, err
		}
		return MyResult{Result: result, Param2: input.Param2}, nil
	},
)

Options

OptionDescription
romancy.WithEventHandler(true)Automatically registers as CloudEvents handler

Starting Workflows

// Start a workflow programmatically
instanceID, err := romancy.StartWorkflow(ctx, app, myWorkflow, MyInput{
	Param1: "hello",
	Param2: 42,
})

CloudEvents Auto-Registration (Opt-in)

By default, workflows are NOT automatically registered as CloudEvents handlers (security).

To enable auto-registration:

type OrderResult struct {
	Status  string `json:"status"`
	OrderID string `json:"order_id"`
}

var orderWorkflow = romancy.DefineWorkflow("order_workflow",
	func(ctx *romancy.WorkflowContext, orderID string) (OrderResult, error) {
		// This workflow will automatically handle CloudEvents
		// with type="order_workflow"
		return OrderResult{Status: "completed", OrderID: orderID}, nil
	},
	romancy.WithEventHandler(true), // Enable CloudEvents auto-registration
)

How it works:

  1. CloudEvent arrives with type="order_workflow"
  2. Romancy extracts data from the event
  3. Workflow starts with data as parameters
  4. Workflow instance ID is returned

Security note: Only use WithEventHandler() for workflows you want publicly accessible via CloudEvents.

DefineActivity

DefineActivity creates an activity that performs business logic.

Basic Usage

type EmailResult struct {
	Sent      bool   `json:"sent"`
	MessageID string `json:"message_id"`
}

var sendEmail = romancy.DefineActivity("send_email",
	func(ctx context.Context, email, subject string) (EmailResult, error) {
		// Call external service
		response, err := emailService.Send(email, subject)
		if err != nil {
			return EmailResult{}, err
		}
		return EmailResult{
			Sent:      true,
			MessageID: response.ID,
		}, nil
	},
)

Automatic Transactions

Activities are automatically transactional:

type OrderResult struct {
	OrderID string `json:"order_id"`
}

var createOrder = romancy.DefineActivity("create_order",
	func(ctx context.Context, orderID string) (OrderResult, error) {
		// All operations in a single transaction:
		// 1. Activity execution
		// 2. History recording
		// 3. Event publishing (if using transactional outbox)
		return OrderResult{OrderID: orderID}, nil
	},
)

Custom Database Operations

For atomic operations with your own database tables, use the session from WorkflowContext:

var createOrderWithDB = romancy.DefineActivity("create_order_with_db",
	func(ctx context.Context, orderID string) (OrderResult, error) {
		// Access workflow context for session
		wfCtx := romancy.GetWorkflowContext(ctx)
		session := wfCtx.Session()

		// Your database operations
		order := &Order{OrderID: orderID}
		if err := session.Create(order).Error; err != nil {
			return OrderResult{}, err
		}

		// Romancy automatically commits (or rolls back on error)
		return OrderResult{OrderID: orderID}, nil
	},
)

Retry Policies

Activities automatically retry on failure with exponential backoff. This provides resilience against transient failures like network timeouts or temporary service unavailability.

Default Retry Behavior

By default, activities retry 5 times with exponential backoff:

type PaymentResult struct {
	TransactionID string `json:"transaction_id"`
}

var callPaymentAPI = romancy.DefineActivity("call_payment_api",
	func(ctx context.Context, amount float64) (PaymentResult, error) {
		// Default: 5 attempts with exponential backoff
		response, err := paymentService.Charge(amount)
		if err != nil {
			return PaymentResult{}, err
		}
		return PaymentResult{TransactionID: response.ID}, nil
	},
)

Default schedule:

  • Attempt 1: Immediate execution
  • Attempt 2: 1 second delay
  • Attempt 3: 2 seconds delay
  • Attempt 4: 4 seconds delay
  • Attempt 5: 8 seconds delay

If all 5 attempts fail, a retry exhausted error is returned.

Custom Retry Policies

Configure retry behavior per activity using WithRetryPolicy:

type PaymentStatusResult struct {
	Status string `json:"status"`
}

var criticalPayment = romancy.DefineActivity("critical_payment",
	func(ctx context.Context, orderID string) (PaymentStatusResult, error) {
		response, err := paymentService.Process(orderID)
		if err != nil {
			return PaymentStatusResult{}, err
		}
		return PaymentStatusResult{Status: response.Status}, nil
	},
	romancy.WithRetryPolicy(&retry.Policy{
		MaxAttempts:     10,                      // More attempts for critical operations
		InitialInterval: 500 * time.Millisecond, // Faster initial retry
		Multiplier:      1.5,                    // Slower exponential growth
		MaxInterval:     30 * time.Second,       // Cap delay at 30 seconds
	}),
)

retry.Policy fields:

FieldTypeDefaultDescription
MaxAttemptsint5Maximum retry attempts (0 = infinite)
InitialIntervaltime.Duration1sFirst retry delay
Multiplierfloat642.0Exponential backoff multiplier
MaxIntervaltime.Duration60sMaximum retry delay

Application-Level Default Policy

Set a default retry policy for all activities in your application:

app := romancy.NewApp(
	romancy.WithDatabase("postgres://localhost/workflows"),
	romancy.WithDefaultRetryPolicy(&retry.Policy{
		MaxAttempts:     10,
		InitialInterval: 2 * time.Second,
		MaxInterval:     2 * time.Minute,
	}),
)

Policy resolution order:

  1. Activity-level policy (highest priority)
  2. Application-level policy
  3. Framework default (5 attempts)

Non-Retryable Errors

Use TerminalError for errors that should never be retried:

type ValidationResult struct {
	OrderID string `json:"order_id"`
	Valid   bool   `json:"valid"`
}

var validateOrder = romancy.DefineActivity("validate_order",
	func(ctx context.Context, orderID string) (ValidationResult, error) {
		order, err := db.GetOrder(orderID)
		if err != nil {
			return ValidationResult{}, err
		}

		if order == nil {
			// Don't retry - order doesn't exist
			return ValidationResult{}, romancy.TerminalError(fmt.Errorf("order %s not found", orderID))
		}

		if order.Status == "cancelled" {
			// Business rule violation - don't retry
			return ValidationResult{}, romancy.TerminalError(fmt.Errorf("order %s is cancelled", orderID))
		}

		return ValidationResult{OrderID: orderID, Valid: true}, nil
	},
)

When to use TerminalError:

  • ✅ Validation failures (invalid input, malformed data)
  • ✅ Business rule violations (insufficient funds, order cancelled)
  • ✅ Permanent errors (resource not found, access denied)
  • ❌ Transient errors (network timeout, service unavailable) - let these retry!

Built-in Retry Policies

Romancy provides helper functions for common scenarios:

import "github.com/i2y/romancy/retry"

type ProcessResult struct {
	Processed bool `json:"processed"`
}

// Default policy (5 attempts, exponential backoff)
var defaultActivity = romancy.DefineActivity("default_activity",
	func(ctx context.Context, data string) (ProcessResult, error) {
		return ProcessResult{Processed: true}, nil
	},
	romancy.WithRetryPolicy(retry.DefaultPolicy()),
)

// No retries - fail immediately
var noRetryActivity = romancy.DefineActivity("no_retry_activity",
	func(ctx context.Context, data string) (ProcessResult, error) {
		return ProcessResult{Processed: true}, nil
	},
	romancy.WithRetryPolicy(retry.NoRetry()),
)

// Fixed interval retries
var fixedRetryActivity = romancy.DefineActivity("fixed_retry_activity",
	func(ctx context.Context, data string) (ProcessResult, error) {
		return ProcessResult{Processed: true}, nil
	},
	romancy.WithRetryPolicy(retry.Fixed(5, 2*time.Second)), // 5 attempts, 2s interval
)

// Exponential backoff
var exponentialActivity = romancy.DefineActivity("exponential_activity",
	func(ctx context.Context, data string) (ProcessResult, error) {
		return ProcessResult{Processed: true}, nil
	},
	romancy.WithRetryPolicy(retry.Exponential(10, 100*time.Millisecond, 30*time.Second)),
)

Activity IDs and Deterministic Replay

Romancy automatically assigns IDs to activities for deterministic replay after crashes. Understanding when to use manual IDs vs. auto-generated IDs is important.

Auto-Generated IDs (Default - Recommended)

For sequential execution, Romancy automatically generates IDs in the format "{function_name}:{counter}":

type WorkflowResult struct {
	Status string `json:"status"`
}

var myWorkflow = romancy.DefineWorkflow("my_workflow",
	func(ctx *romancy.WorkflowContext, orderID string) (WorkflowResult, error) {
		// Auto-generated IDs: "validate:1", "process:1", "notify:1"
		_, _ = validate.Execute(ctx, orderID)    // "validate:1"
		_, _ = process.Execute(ctx, orderID)     // "process:1"
		_, _ = notify.Execute(ctx, orderID)      // "notify:1"
		return WorkflowResult{Status: "completed"}, nil
	},
)

How it works:

  • First call to validate.Execute()"validate:1"
  • Second call to validate.Execute()"validate:2"
  • First call to process.Execute()"process:1"

Even with conditional branches, auto-generation works correctly:

type CreditCheckResult struct {
	Score int `json:"score"`
}

type LoanResult struct {
	Status      string `json:"status"`
	ApplicantID string `json:"applicant_id"`
}

var loanApproval = romancy.DefineWorkflow("loan_approval",
	func(ctx *romancy.WorkflowContext, applicantID string) (LoanResult, error) {
		creditResult, _ := checkCredit.Execute(ctx, applicantID) // "check_credit:1"

		if creditResult.Score >= 700 {
			result, _ := approve.Execute(ctx, applicantID)  // "approve:1"
			return result, nil
		} else {
			result, _ := reject.Execute(ctx, applicantID)   // "reject:1"
			return result, nil
		}
	},
)

Manual IDs (Required for Concurrent Execution)

Manual activity ID specification is required ONLY for concurrent execution using goroutines:

type FetchResult struct {
	URL     string `json:"url"`
	Content string `json:"content"`
}

type ConcurrentResult struct {
	Results []FetchResult `json:"results"`
}

var concurrentWorkflow = romancy.DefineWorkflow("concurrent_workflow",
	func(ctx *romancy.WorkflowContext, urls []string) (ConcurrentResult, error) {
		// Manual IDs required for concurrent execution
		var wg sync.WaitGroup
		results := make([]FetchResult, len(urls))

		for i, url := range urls {
			wg.Add(1)
			go func(idx int, u string) {
				defer wg.Done()
				// Use WithActivityID for concurrent execution
				result, _ := fetchData.Execute(ctx, u,
					romancy.WithActivityID(fmt.Sprintf("fetch_data:%d", idx+1)))
				results[idx] = result
			}(i, url)
		}

		wg.Wait()
		return ConcurrentResult{Results: results}, nil
	},
)

When manual IDs are required:

  • Goroutine-based concurrent execution
  • Any scenario where execution order is non-deterministic

Best Practices

Do: Rely on auto-generation for sequential execution

result1, _ := activityOne.Execute(ctx, data)
result2, _ := activityTwo.Execute(ctx, data)

Don’t: Manually specify IDs for sequential execution

// Unnecessary - adds noise
result1, _ := activityOne.Execute(ctx, data, romancy.WithActivityID("activity_one:1"))
result2, _ := activityTwo.Execute(ctx, data, romancy.WithActivityID("activity_two:1"))

Workflow vs. Activity: When to Use Which?

Use DefineWorkflow for:

  • ✅ Orchestrating multiple steps
  • ✅ Coordinating activities
  • ✅ Defining business processes
  • ✅ Decision logic (if/else, loops)

Example:

type OnboardingResult struct {
	Status string `json:"status"`
}

type AccountResult struct {
	AccountID string `json:"account_id"`
	Email     string `json:"email"`
}

var userOnboarding = romancy.DefineWorkflow("user_onboarding",
	func(ctx *romancy.WorkflowContext, userID string) (OnboardingResult, error) {
		// Orchestration logic
		account, _ := createAccount.Execute(ctx, userID)
		_, _ = sendWelcomeEmail.Execute(ctx, account.Email)
		_, _ = setupPreferences.Execute(ctx, userID)
		return OnboardingResult{Status: "completed"}, nil
	},
)

Use DefineActivity for:

  • ✅ Database writes
  • ✅ API calls
  • ✅ File I/O
  • ✅ External service calls
  • ✅ Any side-effecting operation

Example:

var createAccount = romancy.DefineActivity("create_account",
	func(ctx context.Context, userID string) (AccountResult, error) {
		// Business logic
		account, err := db.CreateUser(userID)
		if err != nil {
			return AccountResult{}, err
		}
		return AccountResult{
			AccountID: account.ID,
			Email:     account.Email,
		}, nil
	},
)

Complete Example

Here’s a complete example showing workflows and activities together:

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/i2y/romancy"
)

// Data models
type UserInput struct {
	UserID string `json:"user_id"`
	Email  string `json:"email"`
	Name   string `json:"name"`
}

type UserResult struct {
	UserID    string `json:"user_id"`
	AccountID string `json:"account_id"`
	Status    string `json:"status"`
}

type DatabaseRecordResult struct {
	AccountID string `json:"account_id"`
	Email     string `json:"email"`
	Name      string `json:"name"`
}

type WelcomeEmailResult struct {
	Sent  bool   `json:"sent"`
	Email string `json:"email"`
}

type ProfileSettings struct {
	Theme         string `json:"theme"`
	Notifications bool   `json:"notifications"`
}

type UserProfileResult struct {
	ProfileID string          `json:"profile_id"`
	Settings  ProfileSettings `json:"settings"`
}

// Activities
var createDatabaseRecord = romancy.DefineActivity("create_database_record",
	func(ctx context.Context, userID, email, name string) (DatabaseRecordResult, error) {
		fmt.Printf("Creating user %s in database\n", userID)
		// Simulate database write
		return DatabaseRecordResult{
			AccountID: fmt.Sprintf("ACC-%s", userID),
			Email:     email,
			Name:      name,
		}, nil
	},
)

var sendWelcomeEmail = romancy.DefineActivity("send_welcome_email",
	func(ctx context.Context, email, name string) (WelcomeEmailResult, error) {
		fmt.Printf("Sending welcome email to %s\n", email)
		// Simulate email service
		return WelcomeEmailResult{Sent: true, Email: email}, nil
	},
)

var createUserProfile = romancy.DefineActivity("create_user_profile",
	func(ctx context.Context, accountID, name string) (UserProfileResult, error) {
		fmt.Printf("Creating profile for %s\n", accountID)
		// Simulate profile creation
		return UserProfileResult{
			ProfileID: fmt.Sprintf("PROF-%s", accountID),
			Settings:  ProfileSettings{Theme: "light", Notifications: true},
		}, nil
	},
)

// Workflow
var userRegistrationWorkflow = romancy.DefineWorkflow("user_registration",
	func(ctx *romancy.WorkflowContext, input UserInput) (UserResult, error) {
		// Step 1: Database record
		account, err := createDatabaseRecord.Execute(ctx, input.UserID, input.Email, input.Name)
		if err != nil {
			return UserResult{}, err
		}

		// Step 2: Welcome email
		_, err = sendWelcomeEmail.Execute(ctx, account.Email, account.Name)
		if err != nil {
			return UserResult{}, err
		}

		// Step 3: User profile
		_, err = createUserProfile.Execute(ctx, account.AccountID, account.Name)
		if err != nil {
			return UserResult{}, err
		}

		return UserResult{
			UserID:    input.UserID,
			AccountID: account.AccountID,
			Status:    "completed",
		}, nil
	},
)

func main() {
	app := romancy.NewApp(
		romancy.WithDatabase("users.db"),
		romancy.WithWorkerID("worker-1"),
	)

	ctx := context.Background()

	if err := app.Start(ctx); err != nil {
		log.Fatal(err)
	}
	defer app.Shutdown(ctx)

	// Start workflow
	instanceID, err := romancy.StartWorkflow(ctx, app, userRegistrationWorkflow, UserInput{
		UserID: "user_123",
		Email:  "user@example.com",
		Name:   "John Doe",
	})
	if err != nil {
		log.Fatal(err)
	}

	fmt.Printf("Workflow started: %s\n", instanceID)
}

Best Practices

1. Keep Workflows Simple

Good:

type OrderProcessResult struct {
	Status string `json:"status"`
}

type InventoryResult struct {
	Total float64 `json:"total"`
}

var processOrder = romancy.DefineWorkflow("process_order",
	func(ctx *romancy.WorkflowContext, orderID string) (OrderProcessResult, error) {
		inventory, _ := reserveInventory.Execute(ctx, orderID)
		_, _ = processPayment.Execute(ctx, inventory.Total)
		_, _ = shipOrder.Execute(ctx, orderID)
		return OrderProcessResult{Status: "completed"}, nil
	},
)

Bad:

var processOrder = romancy.DefineWorkflow("process_order",
	func(ctx *romancy.WorkflowContext, orderID string) (OrderProcessResult, error) {
		// Don't put business logic in workflows!
		inventoryData := db.Query("SELECT ...")  // ❌
		total := calculateTotal(inventoryData)    // ❌
		externalAPI.Call(...)                     // ❌
		return OrderProcessResult{Status: "completed"}, nil
	},
)

2. Activities Should Be Focused

Good:

type SendEmailResult struct {
	Sent bool `json:"sent"`
}

var sendEmail = romancy.DefineActivity("send_email",
	func(ctx context.Context, email, subject string) (SendEmailResult, error) {
		// Single responsibility: send email
		_, err := emailService.Send(email, subject)
		if err != nil {
			return SendEmailResult{}, err
		}
		return SendEmailResult{Sent: true}, nil
	},
)

Bad:

var sendEmailAndUpdateDBAndLog = romancy.DefineActivity("send_email_and_update_db_and_log",
	func(ctx context.Context, ...) (SendEmailResult, error) {
		// Too many responsibilities!
		emailService.Send(...)
		db.Update(...)
		logger.Log(...)
		// Break this into 3 separate activities!
		return SendEmailResult{}, nil
	},
)

3. Use Go Structs for Type Safety

Good:

type OrderInput struct {
	OrderID string  `json:"order_id"`
	Amount  float64 `json:"amount"`
}

type OrderWorkflowResult struct {
	Status  string `json:"status"`
	OrderID string `json:"order_id"`
}

var orderWorkflow = romancy.DefineWorkflow("order_workflow",
	func(ctx *romancy.WorkflowContext, input OrderInput) (OrderWorkflowResult, error) {
		// Type-safe, structured input and output
		return OrderWorkflowResult{Status: "completed", OrderID: input.OrderID}, nil
	},
)

Bad:

var orderWorkflow = romancy.DefineWorkflow("order_workflow",
	func(ctx *romancy.WorkflowContext, orderID string, amount float64) (OrderWorkflowResult, error) {
		// Multiple parameters - harder to extend, no validation
		return OrderWorkflowResult{}, nil
	},
)

Managing Workflow Instances

Romancy provides several methods for querying and managing workflow instances.

Get a Specific Instance

instance, err := app.GetInstance(ctx, "wf_abc123")
if err != nil {
    // Handle error
}
if instance == nil {
    // Instance not found
}
fmt.Println(instance.Status, instance.WorkflowName)

List Instances with Filters

import "github.com/i2y/romancy/internal/storage"

result, err := app.ListInstances(ctx, storage.ListInstancesOptions{
    WorkflowNameFilter: "order",      // Partial match
    StatusFilter:       "running",    // Exact match
    InstanceIDFilter:   "order_",     // Partial match
    Limit:             50,            // Page size (default: 50)
})
if err != nil {
    // Handle error
}

for _, inst := range result.Instances {
    fmt.Println(inst.InstanceID, inst.Status)
}

// Paginate
if result.HasMore {
    nextResult, _ := app.ListInstances(ctx, storage.ListInstancesOptions{
        PageToken: result.NextPageToken,
    })
}

Find Instances by Input Data

Query workflow instances by values in their input data:

// Find orders for a specific customer
instances, err := app.FindInstances(ctx, map[string]any{
    "customer_id": "cust_123",
})

// Find orders with specific status in input
instances, err := app.FindInstances(ctx, map[string]any{
    "order.status": "pending",
    "order.priority": "high",
})

JSON Path Syntax

Input filters support dot-notation for nested fields:

PatternMatches
"customer_id"Top-level field
"order.id"Nested field {"order": {"id": ...}}
"items.0.sku"Array access (if supported by DB)

Full Options

result, err := app.FindInstancesWithOptions(ctx, storage.ListInstancesOptions{
    InputFilters: map[string]any{
        "customer.id": "cust_123",
        "order.type": "subscription",
    },
    StatusFilter:       storage.StatusRunning,  // Optional: filter by status
    WorkflowNameFilter: "order",                // Optional: filter by workflow name
    Limit:              100,
})

Supported Value Types

  • Strings: Exact match
  • Numbers: Numeric comparison
  • Booleans: True/false
  • Null: Null comparison

Database Support

DatabaseJSON Path Support
SQLitejson_extract()
PostgreSQL#>> operator
MySQLJSON_EXTRACT()

Next Steps