Module: CMDx::Workflow::ClassMethods

Defined in:
lib/cmdx/workflow.rb

Instance Method Summary collapse

Instance Method Details

#inherited(subclass) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Parameters:

  • subclass (Class)

    newly defined workflow subclass



17
18
19
20
# File 'lib/cmdx/workflow.rb', line 17

def inherited(subclass)
  super
  subclass.instance_variable_set(:@pipeline, pipeline.dup)
end

#pipelineArray<ExecutionGroup>

Returns declared groups, in order.

Returns:



23
24
25
# File 'lib/cmdx/workflow.rb', line 23

def pipeline
  @pipeline ||= []
end

#tasks(*tasks, **options) ⇒ Array<ExecutionGroup> Also known as: task

Declares a task group. With no arguments, returns the pipeline. Tasks must be Task subclasses.

Parameters:

  • tasks (Array<Class<Task>>)
  • options (Hash{Symbol => Object})

Options Hash (**options):

  • :strategy (:sequential, :parallel) — default: :sequential
  • :pool_size (Integer)

    parallel worker/fiber count

  • :executor (:threads, :fibers, #call) — default: :threads

    parallel dispatch backend. :fibers requires a Fiber.scheduler to be installed (e.g. Async { ... }). A custom callable accepting jobs:, concurrency:, on_job: may also be passed.

  • :merger (:last_write_wins, :deep_merge, :no_merge, #call) — default: :last_write_wins

    how successful parallel contexts are folded back into the workflow context. Merging happens in declaration order. A callable ->(workflow_context, result) { ... } may be passed to implement custom behavior (e.g. namespacing by task name).

  • :continue_on_failure (Boolean) — default: false

    when true, run every task in the group to completion (even after a failure) and aggregate all failures into the workflow's errors. Each failed result's errors are merged in with keys namespaced as "TaskClass.input"; failures with no errors entries (bare fail!("reason")) record under "TaskClass.<status>" (e.g. "MyTask.failed") with result.reason as the message (falling back to the localized cmdx.reasons.unspecified string when reason is nil). The pipeline still halts after the group with the first failure (declaration order) as the signal origin. Applies to both :sequential and :parallel strategies. When false (default), :sequential halts on the first failure and :parallel cancels pending tasks (in-flight tasks still finish).

  • :if (Symbol, Proc, #call)
  • :unless (Symbol, Proc, #call)

Returns:

Raises:

  • (DefinitionError)

    when called with options but no tasks

  • (TypeError)

    when any element isn't a Task subclass



61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/cmdx/workflow.rb', line 61

def tasks(*tasks, **options)
  raise DefinitionError, "#{name}: cannot declare an empty task group" if tasks.empty?

  pipeline << ExecutionGroup.new(
    tasks:
      tasks.map do |task|
        next task if task.is_a?(Class) && (task <= Task)

        raise TypeError, "#{task.inspect} is not a Task"
      end,
    options:
  )
end