Workflows¶
Compose multiple tasks into ordered pipelines. A workflow is a Task subclass that includes CMDx::Workflow; the module supplies a #work that delegates to CMDx::Pipeline, which runs the declared task groups against a shared context.
Because workflows are tasks, they inherit every Task feature: inputs, callbacks, middlewares, settings, outputs, and retries. Use these to validate workflow-level inputs, set up shared state, or react to workflow outcomes.
class OnboardingWorkflow < CMDx::Task
include CMDx::Workflow
before_execution :load_user
on_failed :notify_admin!
required :user_id, coerce: :integer
output :onboarded_at
task CreateProfile
task SetupPreferences
task SendWelcome
private
def load_user
context.user = User.find(user_id)
end
def notify_admin!
AdminMailer.onboarding_failed(context.user).deliver_later
end
end
Declarations¶
Tasks run in declaration order, sharing the workflow's context.
Warning
Don't define #work on a workflow — Workflow#work delegates to Pipeline. Defining your own raises CMDx::ImplementationError.
Task¶
task and tasks are aliases — use either interchangeably. Each call appends a new group to the pipeline.
class OnboardingWorkflow < CMDx::Task
include CMDx::Workflow
task CreateUserProfile
task SetupAccountPreferences
tasks SendWelcomeEmail, SendWelcomeSms, CreateDashboard
end
Every entry must be a Task subclass; anything else raises TypeError at declaration time.
Group Options¶
Options apply to the entire group:
| Option | Default | Description |
|---|---|---|
strategy: |
:sequential |
:sequential or :parallel |
pool_size: |
tasks.size |
Worker/fiber count when strategy: :parallel |
executor: |
:threads |
Parallel dispatch backend: :threads, :fibers, or a callable. :fibers requires a Fiber.scheduler to be installed (e.g. inside Async { ... }) |
merger: |
:last_write_wins |
How successful parallel contexts fold back into the workflow context: :last_write_wins, :deep_merge, :no_merge, or a callable ->(workflow_context, result) { ... } |
continue_on_failure: |
false |
When true, run every task in the group to completion even after a failure, and aggregate all failures into the workflow's errors (keyed as the Symbol :"TaskClass.<input>" for input/validation errors and :"TaskClass.<status>" for bare fail! reasons). Applies to both strategies. When false (default), :sequential halts on the first failure and :parallel cancels pending tasks (in-flight tasks still finish) |
if: / unless: |
— | Skip the entire group when the predicate isn't satisfied |
Conditionals¶
Conditionals support multiple syntaxes for flexible execution control. They're evaluated against the workflow instance.
class ContentAccessCheck
def call(workflow)
workflow.context.user.can?(:publish_content)
end
end
class OnboardingWorkflow < CMDx::Task
include CMDx::Workflow
# Symbols resolve to instance methods on the workflow
task SendWelcomeEmail, if: :email_configured?
# Procs and lambdas are instance_exec'd against the workflow
task SendWelcomeEmail, if: -> { Rails.env.production? }
task SendWelcomeEmail, if: proc { context.features_enabled? }
# Class or instance: must respond to #call(workflow)
task SendWelcomeEmail, unless: ContentAccessCheck
task SendWelcomeEmail, if: ContentAccessCheck.new
# The conditional applies to every task in the group
tasks SendWelcomeEmail, CreateDashboard, SetupTutorial, if: :email_configured?
private
def email_configured?
context.user.email_address.matches?(/@mycompany.com/)
end
end
Halt Behavior¶
A workflow halts on the first failed result in any group. Skipped tasks never halt the pipeline — they're treated as no-ops and the next task runs as normal.
When a task fails, the pipeline echoes its reason, state, and status through the workflow via throw!, so the workflow's own result is failed? with the same reason. The propagated signal carries the failed leaf as its origin, so result.origin / result.threw_failure / result.caused_failure all point at the originating task without scanning the chain:
result = AnalyticsWorkflow.execute
result.failed? #=> true
result.reason #=> "metrics service unreachable"
result.origin.task #=> CollectMetrics
result.caused_failure.task #=> CollectMetrics
class AnalyticsWorkflow < CMDx::Task
include CMDx::Workflow
task CollectMetrics # If fails → workflow stops, AnalyticsWorkflow is failed
task FilterOutliers # If skipped → workflow continues
task GenerateDashboard # Only runs if no upstream failure occurred
end
To make a "soft" failure non-halting, have the task skip! instead of fail!. There is no per-group or per-workflow setting to ignore failures.
Rollback in Workflows¶
When a task fails, Runtime calls its #rollback method (if defined) immediately after work returns and before the failure is throw!n up to the workflow. Concretely, the failed leaf task's lifecycle is: perform_work → perform_rollback → on_* callbacks → result finalization → throw to workflow.
When a workflow's pipeline halts, Pipeline then walks every previously executed task instance whose result is success? in reverse execution order and invokes #rollback on any that defines it — saga-style compensation across the whole pipeline. Each compensated result's #rolled_back? becomes true. Skipped tasks are excluded; the failing task itself is rolled back by Runtime and is not re-invoked. Exceptions raised inside a compensator propagate to the caller — handling them is the developer's responsibility.
class PaymentWorkflow < CMDx::Task
include CMDx::Workflow
task ReserveInventory # Succeeds → no rollback
task ChargeCard # Fails → ChargeCard#rollback runs
task SendConfirmation # Never runs (workflow halts on failure)
end
class ChargeCard < CMDx::Task
def work
context.charge = PaymentGateway.charge(context.amount)
fail!("Declined") if context.charge.declined?
end
def rollback
PaymentGateway.void(context.charge.id) if context.charge
end
end
Compensation across tasks
Define #rollback on each task with side effects — the workflow compensates them in reverse order on failure. Use a workflow-level on_failed callback only when the cleanup doesn't belong to any single task.
class PaymentWorkflow < CMDx::Task
include CMDx::Workflow
task ReserveInventory # rolled back second on failure (in reverse)
task ChargeCard # rolled back first if it succeeded; Runtime rolls it back if it failed
task SendConfirmation
end
class ReserveInventory < CMDx::Task
def work
context.reservation_id = Inventory.reserve(context.sku, context.qty)
end
def rollback
Inventory.release(context.reservation_id)
end
end
Parallel groups
Parallel tasks run on a deep_dup'd context, so their #rollback sees only that copy — not the merged workflow context. Keep parallel compensators self-contained (capture what they need during work).
Nested Workflows¶
Workflows are tasks, so they nest naturally:
class EmailPreparationWorkflow < CMDx::Task
include CMDx::Workflow
task ValidateRecipients
task CompileTemplate
end
class EmailDeliveryWorkflow < CMDx::Task
include CMDx::Workflow
tasks SendEmails, TrackDeliveries
end
class CompleteEmailWorkflow < CMDx::Task
include CMDx::Workflow
task EmailPreparationWorkflow
task EmailDeliveryWorkflow, if: proc { context.preparation_successful? }
task GenerateDeliveryReport
end
A nested workflow's failure echoes through its parent the same way a leaf task's failure does, and the chain captures every result for traceability.
Parallel Execution¶
Run a group concurrently using native Ruby threads. No external dependencies required.
class SendWelcomeNotifications < CMDx::Task
include CMDx::Workflow
# One thread per task (default)
tasks SendWelcomeEmail, SendWelcomeSms, SendWelcomePush, strategy: :parallel
# Bounded thread pool
tasks SendWelcomeEmail, SendWelcomeSms, SendWelcomePush,
strategy: :parallel, pool_size: 2
# Default behavior: pending parallel tasks are cancelled once any sibling fails
tasks ChargeCard, ReserveInventory, EmitAnalytics, strategy: :parallel
# Batch processing: run every task and collect every failure into result.errors
tasks ProcessOrder1, ProcessOrder2, ProcessOrder3,
strategy: :parallel, continue_on_failure: true
end
Warning
Each parallel task gets a deep-duplicated context copy, merged back in declaration order (not completion order) — last writer to a key wins, so prefer distinct keys per task.
On failure, pending tasks are cancelled (in-flight ones still finish and their contexts merge) and the failed result propagates via throw!. With continue_on_failure: true, every task runs to completion and failures aggregate on workflow.errors (keyed :"TaskClass.<input>" for validation errors, :"TaskClass.<status>" for bare fail! reasons); the first declaration-order failure is the one re-thrown.
Batch processing with continue_on_failure¶
For batch-style groups where you want to know about every failure rather than stopping at the first one, set continue_on_failure: true. Failures are aggregated into the workflow's errors collection.
class ProcessOrders < CMDx::Task
include CMDx::Workflow
tasks ProcessOrderA, ProcessOrderB, ProcessOrderC, continue_on_failure: true
end
result = ProcessOrders.execute
result.failed? # => true (any task in the group failed)
result.errors.to_h
# => {
# :"ProcessOrderA.failed" => ["card declined"],
# :"ProcessOrderC.amount" => ["amount must be greater than 0"]
# }
The pipeline still halts after the failed group — subsequent groups do not run. The first failure (declaration order) is the signal origin.
Executors¶
The :executor option swaps the concurrency backend while keeping the rest of the parallel semantics (context isolation, merge-on-success, fail-fast) identical.
# Default — native Ruby threads
tasks A, B, C, strategy: :parallel, executor: :threads
# Fiber scheduler — requires Fiber.scheduler to be installed on the caller
tasks A, B, C, strategy: :parallel, executor: :fibers, pool_size: 10
# Custom callable
tasks A, B, C, strategy: :parallel, executor: MyPool.method(:run)
:fibers spawns one fiber per job bounded by pool_size (via a semaphore) and relies on whatever scheduler the caller has installed — most commonly the async gem:
Without a scheduler, :fibers raises at run time — the gem itself stays zero-dep.
A user-supplied executor is any object responding to call(jobs:, concurrency:, on_job:). It must invoke on_job.call(job) for each job and block until all jobs have completed. Chain propagation, cancellation, and context merging are already baked into on_job; the executor only decides how to dispatch.
Executors are resolved from a per-task registry (CMDx::Executors). Built-ins ship with :threads and :fibers; register your own named executor once and reference it by symbol from :executor:
class ApplicationTask < CMDx::Task
register :executor, :bounded_pool, MyPool.method(:run)
end
class ShipItAll < ApplicationTask
include CMDx::Workflow
tasks A, B, C, strategy: :parallel, executor: :bounded_pool
end
The same registry is available globally via CMDx.configuration.executors.register(...).
Merge strategies¶
After every successful sibling completes, each sibling's duplicated context is folded back into the workflow context. The default is last-write-wins in declaration order — reliable and fast, but brittle when two tasks write a nested structure under the same key. :merger lets you pick the collision policy up front.
# Default — shallow, last declared task wins on conflicts
tasks A, B, C, strategy: :parallel, merger: :last_write_wins
# Recursive hash merge — nested structures are combined instead of replaced
tasks A, B, C, strategy: :parallel, merger: :deep_merge
# Don't touch the workflow context at all
tasks A, B, C, strategy: :parallel, merger: :no_merge
# Custom — e.g. namespace each sibling's output under its class name
tasks A, B, C, strategy: :parallel,
merger: ->(ctx, result) { ctx[result.task.name] = result.context.to_h }
Behavior notes:
- Merging always walks successful results in declaration order, never completion order — the fold is deterministic even though parallel execution isn't.
:deep_mergerecurses only intoHashvalues; non-hash collisions (Integer, String, Array, custom objects) still follow last-write-wins so a scalar on either side wins over a hash on the other.:no_mergekeeps the parallel tasks' side effects (each sibling'sresult.contextis still reachable viaresult.chain) but nothing is written back to the workflow context. Useful when you're only interested in per-task telemetry, or when tasks own their own persistence.- A callable receives
(workflow_context, result)and is free to write whatever shape you want. Failed results never reach the merger. - Merge strategies are resolved from a per-task registry (
CMDx::Mergers). Register your own named merger withregister :merger, :name, callable(or onCMDx.configuration.mergers) and reference it by symbol from:merger.
class BuildDashboard < CMDx::Task
include CMDx::Workflow
tasks FetchRevenue, FetchTraffic, FetchErrors,
strategy: :parallel, merger: :deep_merge
# FetchRevenue: context.metrics = { revenue: ... }
# FetchTraffic: context.metrics = { visitors: ... }
# After merge: context.metrics == { revenue: ..., visitors: ..., errors: ... }
end
Task Generator¶
Generate a workflow scaffold:
Produces:
# app/tasks/send_notifications.rb
class SendNotifications < ApplicationTask
include CMDx::Workflow
# Docs: https://drexed.github.io/cmdx/workflows
end
If ApplicationTask isn't defined the generator falls back to CMDx::Task.
Tip
Use present-tense verb + pluralized noun for workflow names: SendNotifications, DownloadFiles, ValidateDocuments.