Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
36af69b
Invoke lambda in the main thread and add support for infinite sequences
takahiro-blab Apr 24, 2025
28b74f9
Add test for infinite sequence
takahiro-blab Apr 24, 2025
5d5eb63
Add test for stopping by StopIteration
takahiro-blab Apr 24, 2025
9d9bd11
Add test for lambda called in same thread
takahiro-blab Apr 24, 2025
756685f
Add comment on JobFactory#runloop
takahiro-blab Apr 28, 2025
60ff3ac
Rewrite JobFactory#stopper from endless (one-line) methods to legacy …
takahiro-blab Apr 28, 2025
4823fcc
Extract merged options into a variable before calling Parallel.in_thr…
takahiro-blab Apr 28, 2025
3eda109
Add comment spec/cases/infinite_sequence.rb
takahiro-blab Apr 28, 2025
7e8a77c
Make thread queue handing use Thread#thread_variable_set and #thread_…
takahiro-blab Apr 28, 2025
20cb415
Update Readme.md
takahiro-blab Apr 28, 2025
7e76725
Add comment on JobFactory#enum_wrapper
takahiro-blab Apr 30, 2025
5eea888
Rename JobFactory's some methods and variables.
takahiro-blab May 1, 2025
ec0083b
Fix for support Ruby 3.0 and earlier.
takahiro-blab May 1, 2025
074eab7
Add comment
takahiro-blab May 1, 2025
70208e6
Rename last few runloop leftovers to worker_queue
takahiro-blab May 4, 2025
98a74e4
Make use a mutex and a counter instead of a Queue in Parallel.in_threads
takahiro-blab May 4, 2025
f3a9dd2
Update comment of Parallel.in_threads in lib/parallel.rb
takahiro-blab May 12, 2025
9b9f4bd
Rename `JobFactory#consume_worker_queue` to `consume_worker_queues`
takahiro-blab May 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ items = [1,2,3]
Parallel.each( -> { items.pop || Parallel::Stop }) { |number| ... }
```

Iterations can be stopped by raising StopIteration.

```Ruby
items = [1,2,3]
Parallel.each( -> { items.pop || raise(StopIteration) }) { |number| ... }
```

Also supports Enumerator instances as a source.

```Ruby
enumerator = Enumerator.new do |y|
y << 1; y << 2; y << 3
end
Parallel.each( enumerator ) { |number| ... }
```

Also supports `any?` or `all?`

```Ruby
Expand Down
86 changes: 78 additions & 8 deletions lib/parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,30 @@ def wait

class JobFactory
def initialize(source, mutex)
@lambda = (source.respond_to?(:call) && source) || queue_wrapper(source)
@source = source.to_a unless @lambda # turn Range and other Enumerable-s into an Array
@lambda = enum_wrapper(source) || (source.respond_to?(:call) && source) || queue_wrapper(source)
@source = source.to_a unless @lambda # turn non-Enumerable-s into an Array
@worker_queues = Thread::Queue.new if @lambda
@mutex = mutex
@index = -1
@stopped = false
end

def next
if producer?
queue_for_thread = Thread.current.thread_variable_get(:parallel_queue)
if @worker_queues && queue_for_thread
return if @stopped
# This #next method may be called from some threads at the same time.
# The main (Parallel's singleton method caller) thread calls @lambda and checks `item == Stop`,
# so it's not necessary to check for Stop here.
item = worker_queues_enq(queue_for_thread)
return if item == Stop
Comment on lines +114 to +116
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment says that we don't need to check for stop but then we check for stop ?

Copy link
Author

@takahiro-blab takahiro-blab May 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This "check for stop" means assigning item == Stop to @stopped. This comment means "We must not assign check result to @stopped." The comment does not refer to the returning....

index = @index += 1
elsif producer?
# - index and item stay in sync
# - do not call lambda after it has returned Stop
item, index = @mutex.synchronize do
return if @stopped
item = @lambda.call
item = call_lambda
@stopped = (item == Stop)
return if @stopped
[item, @index += 1]
Expand All @@ -123,6 +133,26 @@ def next
[item, index]
end

def consume_worker_queues
return unless @worker_queues

loop do
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this right ?

Suggested change
loop do
# every time a threads wants to start work, it adds a new queue, we pop the queue here until everything is done (stop)
# then push a new item into the queue for the thread to read and work on
loop do

queue = @worker_queues.pop
return if queue == Stop
item = call_lambda
queue.push(item)
break if item == Stop
end
@stopped = true
# clear out all work queues by adding a "stop" to them which will stop the thread working on them
begin
while queue = @worker_queues.pop(true)
queue.push(Stop) if queue != Stop # Unlock waiting threads.
end
rescue ThreadError # All threads are unlocked.
end
end

def size
if producer?
Float::INFINITY
Expand All @@ -142,15 +172,36 @@ def unpack(data)
producer? ? data : [@source[data], data]
end

def stop
@worker_queues&.push(Stop)
end

private

def call_lambda
@lambda.call
rescue StopIteration
Stop
end

def worker_queues_enq(queue_for_thread)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this also work if we did:

def worker_queues_enq
  queue = Thread::Queue.new
  @worker_queues.push(queue)
  queue.pop # Wait until @lambda to give us an item to work on
end

so the caller has less state to take care of

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... also maybe method name wait_for_item ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your code also works. However, well, your code makes as many queues as jobs. Those queues remain in memory until they are collected by the GC. I do not consider creating a new queue for every jobs a good implementation.
Furthermore, when we look back on it later, we will wonder “Why does this code create a new queue here?”, wouldn't we think?

In a frank implementation, I would think the queues would be reused, but why make a queue every time?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense to reuse the queues, was mostly trying to understand if that's how it is supposed to work :)

@worker_queues.push(queue_for_thread)
queue_for_thread.pop # Wait until @lambda returns.
end

def producer?
@lambda
end

def queue_wrapper(array)
array.respond_to?(:num_waiting) && array.respond_to?(:pop) && -> { array.pop(false) }
end

# returns the `next` method if the source in an enumerator and cannot be accessed by anything more efficient like []
def enum_wrapper(source)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe give some examples of what types this is trying to detect

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are asking about #enum_wrapper , aren't you?

This method aims converting Enumerator instance and objects including Enumerable to Method instance which calls #next , but objects which is accessible by [] method shouldn't be converted. It's because accessing by index is faster and it can avoid serializing problems, you know.

So, as first, checking [] method, if [] method is available, it returns false. Next, if #next method is available, returns Method instance.

For example:

enum_wrapper([1,2,3]) # -> false
enum_wrapper(1..5) # -> Method ( (1..5).method(:next) )
enum_wrapper(Prime.to_enum) # -> Method (See infinite_sequece.rb test case)

# Convert what is inaccessible by the index
!source.respond_to?(:[]) && source.respond_to?(:next) && source.method(:next)
end
end

class UserInterruptHandler
Expand Down Expand Up @@ -211,13 +262,28 @@ def restore_interrupt(old, signal)
class << self
def in_threads(options = { count: 2 })
threads = []
count, = extract_count_from_options(options)
count, options = extract_count_from_options(options)

counter = count # worker thread remaining counter
mutex = options[:consume_worker_queues] ? Mutex.new : nil
consume_worker_queues_stopper = options[:stopper]

Thread.handle_interrupt(Exception => :never) do
Thread.handle_interrupt(Exception => :immediate) do
count.times do |i|
threads << Thread.new { yield(i) }
threads << Thread.new do
yield(i)
ensure
mutex&.synchronize do
if counter <= 1
# last thread needs to stop the worker queue processing
consume_worker_queues_stopper.call
end
counter -= 1
end
end
end
options[:consume_worker_queues]&.call # Invoke lambda in caller thread, and provide jobs to thread queue.
threads.map(&:value)
end
ensure
Expand Down Expand Up @@ -431,7 +497,9 @@ def work_in_threads(job_factory, options, &block)
results_mutex = Mutex.new # arrays are not thread-safe on jRuby
exception = nil

in_threads(options) do |worker_num|
thread_options = options.merge(consume_worker_queues: job_factory.method(:consume_worker_queues), stopper: job_factory.method(:stop))
in_threads(thread_options) do |worker_num|
Thread.current.thread_variable_set(:parallel_queue, Thread::Queue.new)
self.worker_number = worker_num
# as long as there are more jobs, work on one of them
while !exception && (set = job_factory.next)
Expand Down Expand Up @@ -523,9 +591,11 @@ def work_in_processes(job_factory, options, &blk)
exception = nil

UserInterruptHandler.kill_on_ctrl_c(workers.map(&:pid), options) do
in_threads(options) do |i|
thread_options = options.merge(consume_worker_queues: job_factory.method(:consume_worker_queues), stopper: job_factory.method(:stop))
in_threads(thread_options) do |i|
worker = workers[i]
worker.thread = Thread.current
Thread.current.thread_variable_set(:parallel_queue, Thread::Queue.new)
worked = false

begin
Expand Down
24 changes: 24 additions & 0 deletions spec/cases/infinite_sequence.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# frozen_string_literal: true

# Reproduction case based on GitHub Issue #211
# Original code provided by @cyclotron3k in the issue
# using a enum that is infinite, so this will hang forever when trying to convert to an array

require 'prime'
require './spec/cases/helper'

private_key = 12344567899

results = []

[{ in_threads: 2 }, { in_threads: 0 }].each do |options|
primes = Prime.to_enum
Parallel.each(primes, options) do |prime|
if private_key % prime == 0
results << prime.to_s
raise Parallel::Break
end
end
end

print results.join(',')
21 changes: 21 additions & 0 deletions spec/cases/lambda_call_same_thread.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# frozen_string_literal: true
require './spec/cases/helper'

runner_thread = nil
all = [3, 2, 1]
my_proc = proc {
runner_thread ||= Thread.current
if Thread.current != runner_thread
raise "proc is called in different thread!"
end

all.pop || Parallel::Stop
}

class Callback
def self.call(x)
$stdout.sync = true
"ITEM-#{x}"
end
end
puts(Parallel.map(my_proc, in_threads: 2) { |(i, _id)| Callback.call i })
21 changes: 21 additions & 0 deletions spec/cases/lambda_can_stop_by_exception.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# frozen_string_literal: true
require './spec/cases/helper'

def generate_proc
count = 0
proc {
raise StopIteration if 3 <= count
count += 1
}
end

class Callback
def self.call(x)
$stdout.sync = true
"ITEM-#{x}"
end
end

[{ in_processes: 2 }, { in_threads: 2 }, { in_threads: 0 }].each do |options|
puts(Parallel.map(generate_proc, options) { |(i, _id)| Callback.call i })
end
12 changes: 12 additions & 0 deletions spec/parallel_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,18 @@ def cpus
end
end

it "can process infinite sequence enumerator" do
ruby("spec/cases/infinite_sequence.rb").split(',').should == ['139'] * 2
end

it "can be finished by lambda raising StopIteration" do
ruby("spec/cases/lambda_can_stop_by_exception.rb").should == "ITEM-1\nITEM-2\nITEM-3\n" * 3
end

it "must call lambda in same thread" do
ruby("spec/cases/lambda_call_same_thread.rb").should == "ITEM-1\nITEM-2\nITEM-3\n"
end

it "fails when running with a prefilled queue without stop since there are no threads to fill it" do
error = (RUBY_VERSION >= "2.0.0" ? "No live threads left. Deadlock?" : "deadlock detected (fatal)")
ruby("spec/cases/fatal_queue.rb 2>&1").should include error
Expand Down
Loading