Class: CMDx::Parallelizer

Inherits:
Object
  • Object
show all
Defined in:
lib/cmdx/parallelizer.rb

Overview

Bounded thread pool that processes items concurrently.

Distributes work across a fixed number of threads using a queue, collecting results in submission order.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(items, pool_size: nil) ⇒ Parallelizer

Creates a new Parallelizer instance.

Examples:

Parallelizer.new([1, 2, 3], pool_size: 2)

Parameters:

  • items (Array)

    the items to process concurrently

  • pool_size (Integer) (defaults to: nil)

    number of threads (defaults to item count)

Rbs:

  • (Array items, ?pool_size: Integer) -> void



41
42
43
44
# File 'lib/cmdx/parallelizer.rb', line 41

def initialize(items, pool_size: nil)
  @items = items
  @pool_size = Integer(pool_size || items.size)
end

Instance Attribute Details

#itemsArray (readonly)

Returns the items to process.

Examples:

parallelizer.items # => [1, 2, 3]

Returns:

  • (Array)

    the items to process

Rbs:



18
19
20
# File 'lib/cmdx/parallelizer.rb', line 18

def items
  @items
end

#pool_sizeInteger (readonly)

Returns the number of threads in the pool.

Examples:

parallelizer.pool_size # => 4

Returns:

  • (Integer)

    the thread pool size

Rbs:

  • @pool_size: Integer



28
29
30
# File 'lib/cmdx/parallelizer.rb', line 28

def pool_size
  @pool_size
end

Class Method Details

.call(items, pool_size: nil) {|item| ... } ⇒ Array

Processes items concurrently and returns results in submission order.

Examples:

Parallelizer.call([1, 2, 3], pool_size: 2) { |n| n * 10 }
# => [10, 20, 30]

Parameters:

  • items (Array)

    the items to process concurrently

  • pool_size (Integer) (defaults to: nil)

    number of threads (defaults to item count)

Yields:

  • (item)

    block called for each item in a worker thread

Yield Parameters:

  • item (Object)

    an item from the items array

Yield Returns:

  • (Object)

    the result for this item

Returns:

  • (Array)

    results in the same order as input items

Rbs:

  • T, R

    (Array items, ?pool_size: Integer) { (T) -> R } -> Array



62
63
64
# File 'lib/cmdx/parallelizer.rb', line 62

def self.call(items, pool_size: nil, &block)
  new(items, pool_size:).call(&block)
end

Instance Method Details

#call {|item| ... } ⇒ Array

Distributes items across the thread pool and returns results in submission order.

Examples:

Parallelizer.new(%w[a b c]).call { |s| s.upcase }
# => ["A", "B", "C"]

Yields:

  • (item)

    block called for each item in a worker thread

Yield Parameters:

  • item (Object)

    an item from the items array

Yield Returns:

  • (Object)

    the result for this item

Returns:

  • (Array)

    results in the same order as input items

Rbs:

  • T, R

    () { (T) -> R } -> Array



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/cmdx/parallelizer.rb', line 80

def call(&block)
  results = Array.new(items.size)
  queue = Queue.new

  items.each_with_index { |item, i| queue << [item, i] }
  pool_size.times { queue << nil }

  Array.new(pool_size) do
    Thread.new do
      while (entry = queue.pop)
        item, index = entry
        results[index] = block.call(item) # rubocop:disable Performance/RedundantBlockCall
      end
    end
  end.each(&:join)

  results
end