Class: CMDx::Parallelizer
- Inherits:
-
Object
- Object
- CMDx::Parallelizer
- Defined in:
- lib/cmdx/parallelizer.rb
Overview
Bounded thread pool that processes items concurrently.
Distributes work across a fixed number of threads using a queue, collecting results in submission order.
Instance Attribute Summary collapse
-
#items ⇒ Array
readonly
Returns the items to process.
-
#pool_size ⇒ Integer
readonly
Returns the number of threads in the pool.
Class Method Summary collapse
-
.call(items, pool_size: nil) {|item| ... } ⇒ Array
Processes items concurrently and returns results in submission order.
Instance Method Summary collapse
-
#call {|item| ... } ⇒ Array
Distributes items across the thread pool and returns results in submission order.
-
#initialize(items, pool_size: nil) ⇒ Parallelizer
constructor
Creates a new Parallelizer instance.
Constructor Details
#initialize(items, pool_size: nil) ⇒ Parallelizer
Creates a new Parallelizer instance.
41 42 43 44 |
# File 'lib/cmdx/parallelizer.rb', line 41 def initialize(items, pool_size: nil) @items = items @pool_size = Integer(pool_size || items.size) end |
Instance Attribute Details
#items ⇒ Array (readonly)
Returns the items to process.
18 19 20 |
# File 'lib/cmdx/parallelizer.rb', line 18 def items @items end |
#pool_size ⇒ Integer (readonly)
Returns the number of threads in the pool.
28 29 30 |
# File 'lib/cmdx/parallelizer.rb', line 28 def pool_size @pool_size end |
Class Method Details
.call(items, pool_size: nil) {|item| ... } ⇒ Array
Processes items concurrently and returns results in submission order.
62 63 64 |
# File 'lib/cmdx/parallelizer.rb', line 62 def self.call(items, pool_size: nil, &block) new(items, pool_size:).call(&block) end |
Instance Method Details
#call {|item| ... } ⇒ Array
Distributes items across the thread pool and returns results in submission order.
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/cmdx/parallelizer.rb', line 80 def call(&block) results = Array.new(items.size) queue = Queue.new items.each_with_index { |item, i| queue << [item, i] } pool_size.times { queue << nil } Array.new(pool_size) do Thread.new do while (entry = queue.pop) item, index = entry results[index] = block.call(item) # rubocop:disable Performance/RedundantBlockCall end end end.each(&:join) results end |