Class: CMDx::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/cmdx/pipeline.rb

Overview

Runs a Workflow's declared task groups. Each group selects a strategy (:sequential by default, or :parallel). A group failure halts the pipeline by echoing the failed result's signal through throw!, which bubbles up through Runtime as the workflow's own failure.

Groups may opt into batch semantics with continue_on_failure: true, in which case every task in the group runs to completion and all failures are aggregated into the workflow's errors (keyed as "TaskClass.input" for input/validation errors and "TaskClass.<status>" for bare fail! reasons) before the pipeline halts on the first failure (declaration order).

See Also:

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(workflow) ⇒ Pipeline

Returns a new instance of Pipeline.

Parameters:

  • workflow (Task)

    workflow instance whose class includes Workflow



30
31
32
33
# File 'lib/cmdx/pipeline.rb', line 30

def initialize(workflow)
  @workflow = workflow
  @executed = []
end

Class Method Details

.execute(workflow) ⇒ void

This method returns an undefined value.

Parameters:

  • workflow (Task)

    workflow instance whose class includes Workflow



23
24
25
# File 'lib/cmdx/pipeline.rb', line 23

def execute(workflow)
  new(workflow).execute
end

Instance Method Details

#executevoid

This method returns an undefined value.

Iterates every group in the workflow's pipeline, respecting :if/:unless and the :strategy key. Any group that produces a failed result halts execution by throwing through the workflow.

On halt, every previously executed task instance whose result is success? is sent #rollback (when defined) in reverse execution order, providing saga-style compensation. Each compensated result has its :rolled_back option flipped to true. Skipped tasks are excluded; the failing task itself is rolled back by Runtime and is not re-invoked here. Exceptions raised inside a compensator propagate — handling them is the developer's responsibility.

Raises:

  • (ArgumentError)

    for an unknown strategy

  • (StandardError)

    anything raised by a task's #rollback



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/cmdx/pipeline.rb', line 50

def execute
  @workflow.class.pipeline.each do |group|
    next unless Util.satisfied?(group.options[:if], group.options[:unless], @workflow)

    halt =
      case strategy = group.options[:strategy]
      when :sequential, NilClass
        run_sequential(group)
      when :parallel
        run_parallel(group)
      else
        raise ArgumentError, "invalid strategy: #{strategy.inspect}"
      end

    next unless halt

    rollback_executed!
    @workflow.send(:throw!, halt)
  end
end