Skip to content

Conversation

takahiro-blab
Copy link

Code summary:

  1. If JobFactory has a lambda, it now has one queue (@runloop_queue)
  2. Worker threads push their own queue to JobFactory's queue.
  3. JobFactory pushes jobs to threads' queue in main (caller of Parallel singleton method) thread.
  4. Parallel.in_threads method counts finished threads. After all worker threads have finished, last worker thread will call JobFactory#stopper. It stops the loop in caller thread.
  5. If worker threads don't have their queue, (Ex. Parallel.work_in_ractors) JobFactory will operate as before.
  6. If the source can be accessed by the index (like Array), JobFactory will operate as before.

Thanks to the clean design of the original code, these were possible with minimal changes.

Existing RSpec tests have been all passed in my environment.
(I have run test in CRuby 3.4.3 on x86_64 machine except pending tests.)

Limitations:

  • These change have not been benchmarked, so it's possible that it may introduce some performance degradation.
  • The code has only been tested with CRuby (MRI), and has not been verified on other Ruby implementations such as JRuby.

This implementation uses several Thread::Queue instances. While this may not be the most optimal approach, it helps address a few specific issues.

I have added some test codes, but I wasn't completely sure where the test should be placed in parallel_spec.rb, so I added it where it seemed to fit best. Feel free to move or adjust it if you think there's a better spot.

Comment on lines 110 to 112
return if @stopped
item = runloop_enq(queue_for_thread)
return if item == Stop
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it not like this ?

Suggested change
return if @stopped
item = runloop_enq(queue_for_thread)
return if item == Stop
item = runloop_enq(queue_for_thread)
@stopped = (item == Stop)
return if @stopped

Copy link
Author

@takahiro-blab takahiro-blab Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stopped will be set in JobFactory#runloop .

This JobFactory#next may be called from some threads at the same time.
So your @stopped = (item == Stop) needs exclusive control Mutex#synchronize .

Taking the value out of the @lambda and Setting the result to@stopped must be handled in the critical section or be handled in one thread, I think.
Otherwise, a certain thread may clear @stopped flag. This could be a bug.

So previous implementation of Parallel uses @mutex.synchronize .
This PR's code handles @lambda and check item == Stop in #runloop by one thread, so @mutex is given, but it's not used.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah thx, yeah this is a tricky section :)
can you leave a bit of inline comment for the gotchas

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added comment to this code.

lib/parallel.rb Outdated
[item, index]
end

def runloop
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this make sense ?

Suggested change
def runloop
def run_runloop

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some method comments would help here too, what does it do exactly

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Please feel free to change anything.

array.respond_to?(:num_waiting) && array.respond_to?(:pop) && -> { array.pop(false) }
end

def enum_wrapper(source)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe give some examples of what types this is trying to detect

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are asking about #enum_wrapper , aren't you?

This method aims converting Enumerator instance and objects including Enumerable to Method instance which calls #next , but objects which is accessible by [] method shouldn't be converted. It's because accessing by index is faster and it can avoid serializing problems, you know.

So, as first, checking [] method, if [] method is available, it returns false. Next, if #next method is available, returns Method instance.

For example:

enum_wrapper([1,2,3]) # -> false
enum_wrapper(1..5) # -> Method ( (1..5).method(:next) )
enum_wrapper(Prime.to_enum) # -> Method (See infinite_sequece.rb test case)

lib/parallel.rb Outdated
threads = []
count, = extract_count_from_options(options)
count, options = extract_count_from_options(options)
finished_monitor = options[:runloop] && Queue.new(1..(count - 1)) # Insert values, one less in count than the number of threads.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explain why 1 less

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because the last thread must raise ThreadError by calling empty queue's #pop method.

For example, if there are 5 worker threads (when option[:count] is 5), Queue finished_monitor will have 4 values.
Each of five worker threads will execute finished_monitor.pop(true), then 4 of them can get value from finished_monitor.
But the last one thread don't get value, and a ThreadError exception will be raised.
So JobFactory#stopper will be called only once.
(I just realized that, perhaps, multiple calls of JobFactory#stopper may has no side effect... If so, these could be written more concisely without using Queue finished_monitor.)

In rescue section, the last one thread will call stopper.call, which stops JobFactory#runloop.

Queue#pop raises ThreadError when true is given as argument and the queue is empty.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah got it, very complicated ... can you leave some short inline comment to explain a bit

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added comment to this code with below question.

lib/parallel.rb Outdated
yield(i)
ensure
begin
finished_monitor&.pop(true) # This must be executed even if the worker thread is killed (by #work_in_processes).
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explain why it needs to be executed

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is not case, JobFacotry#runloop will block in queue = @runloop_queue.pop and it cannot come back from #runloop.
To main thread's surely finishing options[:runloop]&.call (JobFactory#stopper), each of worker threads must call finished_monitor&.pop(true) even if it is killed. So it's in the ensure section.
(Please look at above question)

(JobFactory#stopper pushes Stop JobFactory's @runloop_queue, so it make #runloop finish.)

And, this logic is also necessary for terminating operations by Ctrl+C or workers' throwing Parallel::Kill.
(When Parallel::Kill or Break is thrown, worker threads will be killed by UserInterruptHandler.kill in #work_in_processes)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx, can you leave a bit of this inline for future archeologists :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added comment with above question.

lib/parallel.rb Outdated

UserInterruptHandler.kill_on_ctrl_c(workers.map(&:pid), options) do
in_threads(options) do |i|
in_threads(options.merge(runloop: job_factory.method(:runloop), stopper: job_factory.method(:stopper))) do |i|
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we reuse the options from line 408 ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, where is line 408?
If you mean "thread_options = options.merge(runloop: job_factory.method(:runloop), stopper: job_factory.method(:stopper))" of above discussion, of course we can cut out this part to a variable and reuse it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have fixed and pushed the change in the same way as the other calling. Is this correct?

lib/parallel.rb Outdated
loop do
break if exception
item, index = job_factory.next
item, index = job_factory.next(queue_for_thread)
Copy link
Owner

@grosser grosser Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could the factory take care of the queue handling by using Thread.current[:parallel_queue] ||= Thread::Queue.new or so ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Value of Thread#[] will vary by Fiber change, so it may be better to use Thread#thread_variable_get and #thread_variable_set. Whether or not, using thread local variables may take care of the queue more simply.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried making the change which use Thread#thread_variable_get and #thread_variable_set for the queue handling, and I have pushed it.

@grosser
Copy link
Owner

grosser commented Apr 25, 2025

thanks, looks all very well thought out to fix these edge-cases :)

mostly looks good, but more comments would help make this easier to understand
runloop is kinda vague, if you have a better name that would be great

@takahiro-blab
Copy link
Author

Thank you for your comments and reviews.
Handling threads is very difficult.

Certainly, runloop may be vague.
How about poploop, lambda_loop, feeder_loop?

@takahiro-blab
Copy link
Author

I have created a commit accepting some of the suggestions.
In addition, I have committed changes to Readme.md related to this pull request.

lib/parallel.rb Outdated
[item, index]
end

def runloop
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def runloop
def consume_enumerator_queues

does this work ?

# consume items for from enumerator queues until they stop producing

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed the comment says "enumerator queues", but as far as I can tell, there's only one queue being consumed from in this method.
Does this enumerator queues mean JobFactory's @lambda, doesn't it?
Should this be singular instead?

@grosser
Copy link
Owner

grosser commented Apr 28, 2025

would producer_queues or enumerator_queues make sense ?
and then consume_from_<x>_queues as method

@takahiro-blab
Copy link
Author

Sorry I am not very good at English,

would producer_queues or enumerator_queues make sense ?

but is this a reference to JobFactory's instance variable @runloop_queue ?

If so, @runloop_queue is a line where workers stand in to get a job. (Actually workers push their own queue to @runloop_queue )

So do these make sense?
Instance variable @runloop_queue -> @workers_line_queue
Method runloop -> distribute_work

@grosser
Copy link
Owner

grosser commented May 1, 2025

maybe worker_queues ?

distribute_work -> consume_worker_queue ?

if the method will be used by everything then something general like distribute_work is fine,
but if it only deals with the enumerator queues, then I'd like to be specific and consistent (use a consistent prefix/suffix like enumerator_queues) so make it clear which part of the codebase belongs to that feature

@grosser
Copy link
Owner

grosser commented May 1, 2025

basically make it easy to ignore a big chunk of the code if debugging something unrelated and make it easy to spot all the things related when debugging enum bugs

- @runloop_queue -> @worker_queues
- runloop -> consume_woker_queues
- stopper -> stop
@takahiro-blab
Copy link
Author

I have renamed JobFactory's instance variable @runloop_queue and #runloop method.
And I have fixed where CI of Ruby 2.7 was failing.

@grosser
Copy link
Owner

grosser commented May 3, 2025

thx for all the update ✨

I can make a separate PR to get rid of ruby 2.7 if that is causing issues, meant to do that for a while but never really broke anything.

Can you rename the last few runloop leftovers to worker_queue or something similar ?

Can it check queue.empty? instead of relying on an exception ? (because they are expensive and can lead to warnings)

@takahiro-blab
Copy link
Author

takahiro-blab commented May 4, 2025

Can it check queue.empty? instead of relying on an exception ? (because they are expensive and can lead to warnings)

Well, without exception, a mutex is necessary. Like this:

# In Parallel.in_threads method
  if options[:runloop]
    finished_monitor = Queue.new # In Ruby 3.0 or earlier, Queue#initialize doesn't receive initial values.
      (1..(count - 1)).each { |i| finished_monitor.push(i) }
  end
  runloop_stopper = options[:stopper]
  mutex = options[:runloop] ? Mutex.new : nil # ADD MUTEX

  Thread.handle_interrupt(Exception => :never) do
    Thread.handle_interrupt(Exception => :immediate) do
      count.times do |i|
        threads << Thread.new do
          yield(i)
        ensure
          mutex&.synchronize { # Add critical section
            if finished_monitor
              if finished_monitor.empty?
                runloop_stopper&.call
              else
                finished_monitor.pop
              end
            end
          }
        end
##### Omitted below #####

It's bacause some threads would check whether the queue is empty and call Queue#pop at the same time.
Without a mutex, after a thread checks whether the queue is empty, other thread may check and call Queue#pop. It will cause race condition. So a mutex is necessary. And the mutex overhead must be considered.
Queue#pop(true) is probably atomic in Ruby level, so additional mutex isn't necessary.

However, thinking about it, if a mutex is used, it's not necessary to use a Queue, which is thread-safe. A mutex and a simple counter variable are enough.
So, the above code can be rewritten like this:

# In Parallel.in_threads method
  counter = count - 1 # COUNTER
  runloop_stopper = options[:stopper]
  mutex = options[:runloop] ? Mutex.new : nil # ADD MUTEX

  Thread.handle_interrupt(Exception => :never) do
    Thread.handle_interrupt(Exception => :immediate) do
      count.times do |i|
        threads << Thread.new do
          yield(i)
        ensure
          mutex&.synchronize { # Add critical section
            if counter <= 0
                runloop_stopper&.call
            end
            counter -= 1
          }
        end
##### Omitted below #####

I haven't tested these two above. However I think my original implementation, which uses one queue and exception, is simpler code. There is no one answer because this is a subjective issue, isn't it?
Which do you think is better?

You are welcome to push any code you think is good, not just this one, also about the names of variables, to the topic branch.

@takahiro-blab
Copy link
Author

After considering, I think using a mutex and a counter is good, so I have pushed the change.

Comment on lines +114 to +116
# so it's not necessary to check for Stop here.
item = worker_queues_enq(queue_for_thread)
return if item == Stop
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment says that we don't need to check for stop but then we check for stop ?

Copy link
Author

@takahiro-blab takahiro-blab May 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This "check for stop" means assigning item == Stop to @stopped. This comment means "We must not assign check result to @stopped." The comment does not refer to the returning....

def consume_worker_queue
return unless @worker_queues

loop do
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this right ?

Suggested change
loop do
# every time a threads wants to start work, it adds a new queue, we pop the queue here until everything is done (stop)
# then push a new item into the queue for the thread to read and work on
loop do

Stop
end

def worker_queues_enq(queue_for_thread)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this also work if we did:

def worker_queues_enq
  queue = Thread::Queue.new
  @worker_queues.push(queue)
  queue.pop # Wait until @lambda to give us an item to work on
end

so the caller has less state to take care of

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... also maybe method name wait_for_item ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your code also works. However, well, your code makes as many queues as jobs. Those queues remain in memory until they are collected by the GC. I do not consider creating a new queue for every jobs a good implementation.
Furthermore, when we look back on it later, we will wonder “Why does this code create a new queue here?”, wouldn't we think?

In a frank implementation, I would think the queues would be reused, but why make a queue every time?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense to reuse the queues, was mostly trying to understand if that's how it is supposed to work :)

@grosser
Copy link
Owner

grosser commented May 7, 2025

code is getting more obvious 👍 (or I read it too many times already :D)

@grosser
Copy link
Owner

grosser commented May 13, 2025

rubocop needs a small fix, otherwise looks good 🤞

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Lambda producer should be run in the main thread Infinite series expansion Does not work with enumerators
2 participants