From 36af69b580bee0e32e7f740fccddefecf8ff9478 Mon Sep 17 00:00:00 2001 From: Takahiro Nagai <78393959+takahiro-blab@users.noreply.github.com> Date: Thu, 24 Apr 2025 18:40:39 +0900 Subject: [PATCH 01/18] Invoke lambda in the main thread and add support for infinite sequences --- lib/parallel.rb | 78 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 67 insertions(+), 11 deletions(-) diff --git a/lib/parallel.rb b/lib/parallel.rb index d0c3f37..a785ae8 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -97,20 +97,26 @@ def wait class JobFactory def initialize(source, mutex) - @lambda = (source.respond_to?(:call) && source) || queue_wrapper(source) - @source = source.to_a unless @lambda # turn Range and other Enumerable-s into an Array + @lambda = enum_wrapper(source) || (source.respond_to?(:call) && source) || queue_wrapper(source) + @source = source.to_a unless @lambda # turn non-Enumerable-s into an Array + @runloop_queue = Thread::Queue.new if @lambda @mutex = mutex @index = -1 @stopped = false end - def next - if producer? + def next(queue_for_thread = nil) + if @runloop_queue && queue_for_thread + return if @stopped + item = runloop_enq(queue_for_thread) + return if item == Stop + index = @index += 1 + elsif producer? # - index and item stay in sync # - do not call lambda after it has returned Stop item, index = @mutex.synchronize do return if @stopped - item = @lambda.call + item = call_lambda @stopped = (item == Stop) return if @stopped [item, @index += 1] @@ -123,6 +129,25 @@ def next [item, index] end + def runloop + return unless @runloop_queue + + loop do + queue = @runloop_queue.pop + return if queue == Stop + item = call_lambda + queue.push(item) + break if item == Stop + end + @stopped = true + begin + while queue = @runloop_queue.pop(true) + queue.push(Stop) if queue != Stop # Unlock waiting threads. + end + rescue ThreadError # All threads are unlocked. + end + end + def size if producer? Float::INFINITY @@ -142,8 +167,21 @@ def unpack(data) producer? ? data : [@source[data], data] end + def stopper = @runloop_queue&.push(Stop) + private + def call_lambda + @lambda.call + rescue StopIteration + Stop + end + + def runloop_enq(queue_for_thread) + @runloop_queue.push(queue_for_thread) + queue_for_thread.pop # Wait until @lambda returns. + end + def producer? @lambda end @@ -151,6 +189,11 @@ def producer? def queue_wrapper(array) array.respond_to?(:num_waiting) && array.respond_to?(:pop) && -> { array.pop(false) } end + + def enum_wrapper(source) + # Convert what is inaccessible by the index + !source.respond_to?(:[]) && source.respond_to?(:next) && source.method(:next) + end end class UserInterruptHandler @@ -211,13 +254,24 @@ def restore_interrupt(old, signal) class << self def in_threads(options = { count: 2 }) threads = [] - count, = extract_count_from_options(options) + count, options = extract_count_from_options(options) + finished_monitor = options[:runloop] && Queue.new(1..(count - 1)) # Insert values, one less in count than the number of threads. + stopper = options[:stopper] Thread.handle_interrupt(Exception => :never) do Thread.handle_interrupt(Exception => :immediate) do count.times do |i| - threads << Thread.new { yield(i) } + threads << Thread.new do + yield(i) + ensure + begin + finished_monitor&.pop(true) # This must be executed even if the worker thread is killed (by #work_in_processes). + rescue ThreadError # Queue#pop raises ThreadError when the queue is empty. + stopper&.call # Stop JobFactory#runloop + end + end end + options[:runloop]&.call # Invoke lambda in caller thread, and provide jobs to thread queue. threads.map(&:value) end ensure @@ -431,10 +485,11 @@ def work_in_threads(job_factory, options, &block) results_mutex = Mutex.new # arrays are not thread-safe on jRuby exception = nil - in_threads(options) do |worker_num| + in_threads(options.merge(runloop: job_factory.method(:runloop), stopper: job_factory.method(:stopper))) do |worker_num| + queue_for_thread = Thread::Queue.new self.worker_number = worker_num # as long as there are more jobs, work on one of them - while !exception && (set = job_factory.next) + while !exception && (set = job_factory.next(queue_for_thread)) begin item, index = set result = with_instrumentation item, index, options do @@ -523,15 +578,16 @@ def work_in_processes(job_factory, options, &blk) exception = nil UserInterruptHandler.kill_on_ctrl_c(workers.map(&:pid), options) do - in_threads(options) do |i| + in_threads(options.merge(runloop: job_factory.method(:runloop), stopper: job_factory.method(:stopper))) do |i| worker = workers[i] worker.thread = Thread.current + queue_for_thread = Thread::Queue.new worked = false begin loop do break if exception - item, index = job_factory.next + item, index = job_factory.next(queue_for_thread) break unless index if options[:isolation] From 28b74f996092940e4a1ced07451c0d071bc182e3 Mon Sep 17 00:00:00 2001 From: Takahiro Nagai <78393959+takahiro-blab@users.noreply.github.com> Date: Thu, 24 Apr 2025 18:42:41 +0900 Subject: [PATCH 02/18] Add test for infinite sequence --- spec/cases/infinite_sequence.rb | 23 +++++++++++++++++++++++ spec/parallel_spec.rb | 4 ++++ 2 files changed, 27 insertions(+) create mode 100644 spec/cases/infinite_sequence.rb diff --git a/spec/cases/infinite_sequence.rb b/spec/cases/infinite_sequence.rb new file mode 100644 index 0000000..2b3021e --- /dev/null +++ b/spec/cases/infinite_sequence.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +# Reproduction case based on GitHub Issue #211 +# Original code provided by @cyclotron3k in the issue + +require 'prime' +require './spec/cases/helper' + +private_key = 12344567899 + +results = [] + +[{ in_threads: 2 }, { in_threads: 0 }].each do |options| + primes = Prime.to_enum + Parallel.each(primes, options) do |prime| + if private_key % prime == 0 + results << prime.to_s + raise Parallel::Break + end + end +end + +print results.join(',') diff --git a/spec/parallel_spec.rb b/spec/parallel_spec.rb index 1589f0c..8104232 100644 --- a/spec/parallel_spec.rb +++ b/spec/parallel_spec.rb @@ -722,6 +722,10 @@ def cpus end end + it "can process infinite sequence enumerator" do + ruby("spec/cases/infinite_sequence.rb").split(',').should == ['139'] * 2 + end + it "fails when running with a prefilled queue without stop since there are no threads to fill it" do error = (RUBY_VERSION >= "2.0.0" ? "No live threads left. Deadlock?" : "deadlock detected (fatal)") ruby("spec/cases/fatal_queue.rb 2>&1").should include error From 5d5eb63770dc02d193004ff4f22185bb122f273c Mon Sep 17 00:00:00 2001 From: Takahiro Nagai <78393959+takahiro-blab@users.noreply.github.com> Date: Thu, 24 Apr 2025 21:07:07 +0900 Subject: [PATCH 03/18] Add test for stopping by StopIteration --- spec/cases/lambda_can_stop_by_exception.rb | 21 +++++++++++++++++++++ spec/parallel_spec.rb | 4 ++++ 2 files changed, 25 insertions(+) create mode 100644 spec/cases/lambda_can_stop_by_exception.rb diff --git a/spec/cases/lambda_can_stop_by_exception.rb b/spec/cases/lambda_can_stop_by_exception.rb new file mode 100644 index 0000000..da8b59e --- /dev/null +++ b/spec/cases/lambda_can_stop_by_exception.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true +require './spec/cases/helper' + +def generate_proc + count = 0 + proc { + raise StopIteration if 3 <= count + count += 1 + } +end + +class Callback + def self.call(x) + $stdout.sync = true + "ITEM-#{x}" + end +end + +[{ in_processes: 2 }, { in_threads: 2 }, { in_threads: 0 }].each do |options| + puts(Parallel.map(generate_proc, options) { |(i, _id)| Callback.call i }) +end diff --git a/spec/parallel_spec.rb b/spec/parallel_spec.rb index 8104232..3127f32 100644 --- a/spec/parallel_spec.rb +++ b/spec/parallel_spec.rb @@ -726,6 +726,10 @@ def cpus ruby("spec/cases/infinite_sequence.rb").split(',').should == ['139'] * 2 end + it "can be finished by lambda raising StopIteration" do + ruby("spec/cases/lambda_can_stop_by_exception.rb").should == "ITEM-1\nITEM-2\nITEM-3\n" * 3 + end + it "fails when running with a prefilled queue without stop since there are no threads to fill it" do error = (RUBY_VERSION >= "2.0.0" ? "No live threads left. Deadlock?" : "deadlock detected (fatal)") ruby("spec/cases/fatal_queue.rb 2>&1").should include error From 9d9bd11908afe539c5ab5dfe5683a0cac516ebad Mon Sep 17 00:00:00 2001 From: Takahiro Nagai <78393959+takahiro-blab@users.noreply.github.com> Date: Thu, 24 Apr 2025 23:14:45 +0900 Subject: [PATCH 04/18] Add test for lambda called in same thread --- spec/cases/lambda_call_same_thread.rb | 21 +++++++++++++++++++++ spec/parallel_spec.rb | 4 ++++ 2 files changed, 25 insertions(+) create mode 100644 spec/cases/lambda_call_same_thread.rb diff --git a/spec/cases/lambda_call_same_thread.rb b/spec/cases/lambda_call_same_thread.rb new file mode 100644 index 0000000..a1a7719 --- /dev/null +++ b/spec/cases/lambda_call_same_thread.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true +require './spec/cases/helper' + +runner_thread = nil +all = [3, 2, 1] +my_proc = proc { + runner_thread ||= Thread.current + if Thread.current != runner_thread + raise "proc is called in different thread!" + end + + all.pop || Parallel::Stop +} + +class Callback + def self.call(x) + $stdout.sync = true + "ITEM-#{x}" + end +end +puts(Parallel.map(my_proc, in_threads: 2) { |(i, _id)| Callback.call i }) diff --git a/spec/parallel_spec.rb b/spec/parallel_spec.rb index 3127f32..e6835b0 100644 --- a/spec/parallel_spec.rb +++ b/spec/parallel_spec.rb @@ -730,6 +730,10 @@ def cpus ruby("spec/cases/lambda_can_stop_by_exception.rb").should == "ITEM-1\nITEM-2\nITEM-3\n" * 3 end + it "must call lambda in same thread" do + ruby("spec/cases/lambda_call_same_thread.rb").should == "ITEM-1\nITEM-2\nITEM-3\n" + end + it "fails when running with a prefilled queue without stop since there are no threads to fill it" do error = (RUBY_VERSION >= "2.0.0" ? "No live threads left. Deadlock?" : "deadlock detected (fatal)") ruby("spec/cases/fatal_queue.rb 2>&1").should include error From 756685f7c195273b3926bf4082bdb0cc7c707322 Mon Sep 17 00:00:00 2001 From: Takahiro Nagai <78393959+takahiro-blab@users.noreply.github.com> Date: Mon, 28 Apr 2025 14:12:11 +0900 Subject: [PATCH 05/18] Add comment on JobFactory#runloop Co-authored-by: Michael Grosser --- lib/parallel.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/parallel.rb b/lib/parallel.rb index a785ae8..752bfd3 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -140,6 +140,7 @@ def runloop break if item == Stop end @stopped = true + # clear out all work queues by adding a "stop" to them which will stop the thread working on them begin while queue = @runloop_queue.pop(true) queue.push(Stop) if queue != Stop # Unlock waiting threads. From 60ff3ac77d8cf298bd8a28ca59915730b7bd69b7 Mon Sep 17 00:00:00 2001 From: Takahiro Nagai <78393959+takahiro-blab@users.noreply.github.com> Date: Mon, 28 Apr 2025 14:14:49 +0900 Subject: [PATCH 06/18] Rewrite JobFactory#stopper from endless (one-line) methods to legacy style Co-authored-by: Michael Grosser --- lib/parallel.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/parallel.rb b/lib/parallel.rb index 752bfd3..2433834 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -168,7 +168,9 @@ def unpack(data) producer? ? data : [@source[data], data] end - def stopper = @runloop_queue&.push(Stop) + def stopper + @runloop_queue&.push(Stop) + end private From 4823fcc9070e6b998a955219f1d3e3c62aa5e586 Mon Sep 17 00:00:00 2001 From: Takahiro Nagai <78393959+takahiro-blab@users.noreply.github.com> Date: Mon, 28 Apr 2025 14:30:14 +0900 Subject: [PATCH 07/18] Extract merged options into a variable before calling Parallel.in_threads Co-authored-by: Michael Grosser --- lib/parallel.rb | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/parallel.rb b/lib/parallel.rb index 2433834..1f95d31 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -488,7 +488,8 @@ def work_in_threads(job_factory, options, &block) results_mutex = Mutex.new # arrays are not thread-safe on jRuby exception = nil - in_threads(options.merge(runloop: job_factory.method(:runloop), stopper: job_factory.method(:stopper))) do |worker_num| + thread_options = options.merge(runloop: job_factory.method(:runloop), stopper: job_factory.method(:stopper)) + in_threads(thread_options) do |worker_num| queue_for_thread = Thread::Queue.new self.worker_number = worker_num # as long as there are more jobs, work on one of them @@ -581,7 +582,8 @@ def work_in_processes(job_factory, options, &blk) exception = nil UserInterruptHandler.kill_on_ctrl_c(workers.map(&:pid), options) do - in_threads(options.merge(runloop: job_factory.method(:runloop), stopper: job_factory.method(:stopper))) do |i| + thread_options = options.merge(runloop: job_factory.method(:runloop), stopper: job_factory.method(:stopper)) + in_threads(thread_options) do |i| worker = workers[i] worker.thread = Thread.current queue_for_thread = Thread::Queue.new From 3eda10981478e7d997f3093c7dc6dd02623769dd Mon Sep 17 00:00:00 2001 From: Takahiro Nagai <78393959+takahiro-blab@users.noreply.github.com> Date: Mon, 28 Apr 2025 14:35:40 +0900 Subject: [PATCH 08/18] Add comment spec/cases/infinite_sequence.rb Co-authored-by: Michael Grosser --- spec/cases/infinite_sequence.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/spec/cases/infinite_sequence.rb b/spec/cases/infinite_sequence.rb index 2b3021e..14a393a 100644 --- a/spec/cases/infinite_sequence.rb +++ b/spec/cases/infinite_sequence.rb @@ -2,6 +2,7 @@ # Reproduction case based on GitHub Issue #211 # Original code provided by @cyclotron3k in the issue +# using a enum that is infinite, so this will hang forever when trying to convert to an array require 'prime' require './spec/cases/helper' From 7e8a77c54db9042a6f6fa8166bee81530fdf9072 Mon Sep 17 00:00:00 2001 From: Takahiro Nagai <78393959+takahiro-blab@users.noreply.github.com> Date: Mon, 28 Apr 2025 15:12:08 +0900 Subject: [PATCH 09/18] Make thread queue handing use Thread#thread_variable_set and #thread_variable_get --- lib/parallel.rb | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/parallel.rb b/lib/parallel.rb index 1f95d31..d920754 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -105,7 +105,8 @@ def initialize(source, mutex) @stopped = false end - def next(queue_for_thread = nil) + def next + queue_for_thread = Thread.current.thread_variable_get(:parallel_queue) if @runloop_queue && queue_for_thread return if @stopped item = runloop_enq(queue_for_thread) @@ -490,10 +491,10 @@ def work_in_threads(job_factory, options, &block) thread_options = options.merge(runloop: job_factory.method(:runloop), stopper: job_factory.method(:stopper)) in_threads(thread_options) do |worker_num| - queue_for_thread = Thread::Queue.new + Thread.current.thread_variable_set(:parallel_queue, Thread::Queue.new) self.worker_number = worker_num # as long as there are more jobs, work on one of them - while !exception && (set = job_factory.next(queue_for_thread)) + while !exception && (set = job_factory.next) begin item, index = set result = with_instrumentation item, index, options do @@ -586,13 +587,13 @@ def work_in_processes(job_factory, options, &blk) in_threads(thread_options) do |i| worker = workers[i] worker.thread = Thread.current - queue_for_thread = Thread::Queue.new + Thread.current.thread_variable_set(:parallel_queue, Thread::Queue.new) worked = false begin loop do break if exception - item, index = job_factory.next(queue_for_thread) + item, index = job_factory.next break unless index if options[:isolation] From 20cb415f9a9314625a94e8546a0a2faf99e2c944 Mon Sep 17 00:00:00 2001 From: Takahiro Nagai <78393959+takahiro-blab@users.noreply.github.com> Date: Mon, 28 Apr 2025 15:34:02 +0900 Subject: [PATCH 10/18] Update Readme.md --- Readme.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/Readme.md b/Readme.md index 8168b89..c389721 100644 --- a/Readme.md +++ b/Readme.md @@ -46,6 +46,22 @@ items = [1,2,3] Parallel.each( -> { items.pop || Parallel::Stop }) { |number| ... } ``` +Iterations can be stopped by raising StopIteration. + +```Ruby +items = [1,2,3] +Parallel.each( -> { items.pop || raise(StopIteration) }) { |number| ... } +``` + +Also supports Enumerator instances as a source. + +```Ruby +enumerator = Enumerator.new do |y| + y << 1; y << 2; y << 3 +end +Parallel.each( enumerator ) { |number| ... } +``` + Also supports `any?` or `all?` ```Ruby From 7e767252eff3f41fe959149e2b7a6841c80e100e Mon Sep 17 00:00:00 2001 From: Takahiro Nagai <78393959+takahiro-blab@users.noreply.github.com> Date: Wed, 30 Apr 2025 18:22:45 +0900 Subject: [PATCH 11/18] Add comment on JobFactory#enum_wrapper Co-authored-by: Michael Grosser --- lib/parallel.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/parallel.rb b/lib/parallel.rb index d920754..9a28542 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -194,6 +194,7 @@ def queue_wrapper(array) array.respond_to?(:num_waiting) && array.respond_to?(:pop) && -> { array.pop(false) } end + # returns the `next` method if the source in an enumerator and cannot be accessed by anything more efficient like [] def enum_wrapper(source) # Convert what is inaccessible by the index !source.respond_to?(:[]) && source.respond_to?(:next) && source.method(:next) From 5eea88894426bd7a558074412ad423176cb05160 Mon Sep 17 00:00:00 2001 From: Takahiro Nagai <78393959+takahiro-blab@users.noreply.github.com> Date: Thu, 1 May 2025 15:20:17 +0900 Subject: [PATCH 12/18] Rename JobFactory's some methods and variables. - @runloop_queue -> @worker_queues - runloop -> consume_woker_queues - stopper -> stop --- lib/parallel.rb | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/lib/parallel.rb b/lib/parallel.rb index 9a28542..d4a793d 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -99,7 +99,7 @@ class JobFactory def initialize(source, mutex) @lambda = enum_wrapper(source) || (source.respond_to?(:call) && source) || queue_wrapper(source) @source = source.to_a unless @lambda # turn non-Enumerable-s into an Array - @runloop_queue = Thread::Queue.new if @lambda + @worker_queues = Thread::Queue.new if @lambda @mutex = mutex @index = -1 @stopped = false @@ -107,7 +107,7 @@ def initialize(source, mutex) def next queue_for_thread = Thread.current.thread_variable_get(:parallel_queue) - if @runloop_queue && queue_for_thread + if @worker_queues && queue_for_thread return if @stopped item = runloop_enq(queue_for_thread) return if item == Stop @@ -130,11 +130,11 @@ def next [item, index] end - def runloop - return unless @runloop_queue + def consume_worker_queue + return unless @worker_queues loop do - queue = @runloop_queue.pop + queue = @worker_queues.pop return if queue == Stop item = call_lambda queue.push(item) @@ -143,7 +143,7 @@ def runloop @stopped = true # clear out all work queues by adding a "stop" to them which will stop the thread working on them begin - while queue = @runloop_queue.pop(true) + while queue = @worker_queues.pop(true) queue.push(Stop) if queue != Stop # Unlock waiting threads. end rescue ThreadError # All threads are unlocked. @@ -169,8 +169,8 @@ def unpack(data) producer? ? data : [@source[data], data] end - def stopper - @runloop_queue&.push(Stop) + def stop + @worker_queues&.push(Stop) end private @@ -182,7 +182,7 @@ def call_lambda end def runloop_enq(queue_for_thread) - @runloop_queue.push(queue_for_thread) + @worker_queues.push(queue_for_thread) queue_for_thread.pop # Wait until @lambda returns. end @@ -261,7 +261,7 @@ def in_threads(options = { count: 2 }) threads = [] count, options = extract_count_from_options(options) finished_monitor = options[:runloop] && Queue.new(1..(count - 1)) # Insert values, one less in count than the number of threads. - stopper = options[:stopper] + runloop_stopper = options[:stopper] Thread.handle_interrupt(Exception => :never) do Thread.handle_interrupt(Exception => :immediate) do @@ -272,7 +272,7 @@ def in_threads(options = { count: 2 }) begin finished_monitor&.pop(true) # This must be executed even if the worker thread is killed (by #work_in_processes). rescue ThreadError # Queue#pop raises ThreadError when the queue is empty. - stopper&.call # Stop JobFactory#runloop + runloop_stopper&.call # Stop JobFactory#consume_worker_queue end end end @@ -490,7 +490,7 @@ def work_in_threads(job_factory, options, &block) results_mutex = Mutex.new # arrays are not thread-safe on jRuby exception = nil - thread_options = options.merge(runloop: job_factory.method(:runloop), stopper: job_factory.method(:stopper)) + thread_options = options.merge(runloop: job_factory.method(:consume_worker_queue), stopper: job_factory.method(:stop)) in_threads(thread_options) do |worker_num| Thread.current.thread_variable_set(:parallel_queue, Thread::Queue.new) self.worker_number = worker_num @@ -584,7 +584,7 @@ def work_in_processes(job_factory, options, &blk) exception = nil UserInterruptHandler.kill_on_ctrl_c(workers.map(&:pid), options) do - thread_options = options.merge(runloop: job_factory.method(:runloop), stopper: job_factory.method(:stopper)) + thread_options = options.merge(runloop: job_factory.method(:consume_worker_queue), stopper: job_factory.method(:stop)) in_threads(thread_options) do |i| worker = workers[i] worker.thread = Thread.current From ec0083b5ed9d5da50ec56e29ee7219a44ca0d4ff Mon Sep 17 00:00:00 2001 From: Takahiro Nagai <78393959+takahiro-blab@users.noreply.github.com> Date: Thu, 1 May 2025 15:28:25 +0900 Subject: [PATCH 13/18] Fix for support Ruby 3.0 and earlier. --- lib/parallel.rb | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/parallel.rb b/lib/parallel.rb index d4a793d..c1d10e1 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -260,7 +260,11 @@ class << self def in_threads(options = { count: 2 }) threads = [] count, options = extract_count_from_options(options) - finished_monitor = options[:runloop] && Queue.new(1..(count - 1)) # Insert values, one less in count than the number of threads. + if options[:runloop] + # Insert values, one less in count than the number of threads. + finished_monitor = Queue.new # In Ruby 3.0 or earlier, Queue#initialize doesn't receive initial values. + (1..(count - 1)).each { |i| finished_monitor.push(i) } + end runloop_stopper = options[:stopper] Thread.handle_interrupt(Exception => :never) do From 074eab700415f5732b7523cdbec452238d80ba3e Mon Sep 17 00:00:00 2001 From: Takahiro Nagai <78393959+takahiro-blab@users.noreply.github.com> Date: Thu, 1 May 2025 18:54:21 +0900 Subject: [PATCH 14/18] Add comment --- lib/parallel.rb | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/lib/parallel.rb b/lib/parallel.rb index c1d10e1..0a1d207 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -109,6 +109,9 @@ def next queue_for_thread = Thread.current.thread_variable_get(:parallel_queue) if @worker_queues && queue_for_thread return if @stopped + # This #next method may be called from some threads at the same time. + # The main (Parallel's singleton method caller) thread calls @lambda and checks `item == Stop`, + # so it's not necessary to check for Stop here. item = runloop_enq(queue_for_thread) return if item == Stop index = @index += 1 @@ -260,6 +263,15 @@ class << self def in_threads(options = { count: 2 }) threads = [] count, options = extract_count_from_options(options) + + # Explanation: + # The queue `finished_monitor` is initialized with `count - 1` values instead of `count`. + # This design ensures that all but one thread can retrieve a value from the queue by calling `finished_monitor.pop(true)`. + # The last thread will attempt to pop from the empty queue and raise a `ThreadError` exception. + # This exception triggers the rescue section where `runloop_stopper` is called, and this stops `JobFactory#consume_worker_queue`. + # By raising this exception for the last thread, we ensure that `JobFactory#stop` is called exactly once. + # Note: While multiple calls to `JobFactory#stopper` might have no side effects, this approach guarantees + # that it is invoked in a controlled and predictable manner. if options[:runloop] # Insert values, one less in count than the number of threads. finished_monitor = Queue.new # In Ruby 3.0 or earlier, Queue#initialize doesn't receive initial values. From 70208e644f1e98055fda9f9e742c958331362af3 Mon Sep 17 00:00:00 2001 From: Takahiro Nagai <78393959+takahiro-blab@users.noreply.github.com> Date: Sun, 4 May 2025 22:57:17 +0900 Subject: [PATCH 15/18] Rename last few runloop leftovers to worker_queue --- lib/parallel.rb | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/parallel.rb b/lib/parallel.rb index 0a1d207..997b9df 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -112,7 +112,7 @@ def next # This #next method may be called from some threads at the same time. # The main (Parallel's singleton method caller) thread calls @lambda and checks `item == Stop`, # so it's not necessary to check for Stop here. - item = runloop_enq(queue_for_thread) + item = worker_queues_enq(queue_for_thread) return if item == Stop index = @index += 1 elsif producer? @@ -184,7 +184,7 @@ def call_lambda Stop end - def runloop_enq(queue_for_thread) + def worker_queues_enq(queue_for_thread) @worker_queues.push(queue_for_thread) queue_for_thread.pop # Wait until @lambda returns. end @@ -268,16 +268,16 @@ def in_threads(options = { count: 2 }) # The queue `finished_monitor` is initialized with `count - 1` values instead of `count`. # This design ensures that all but one thread can retrieve a value from the queue by calling `finished_monitor.pop(true)`. # The last thread will attempt to pop from the empty queue and raise a `ThreadError` exception. - # This exception triggers the rescue section where `runloop_stopper` is called, and this stops `JobFactory#consume_worker_queue`. + # This exception triggers the rescue section where `consume_worker_queue_stopper` is called, and this stops `JobFactory#consume_worker_queue`. # By raising this exception for the last thread, we ensure that `JobFactory#stop` is called exactly once. # Note: While multiple calls to `JobFactory#stopper` might have no side effects, this approach guarantees # that it is invoked in a controlled and predictable manner. - if options[:runloop] + if options[:consume_worker_queue] # Insert values, one less in count than the number of threads. finished_monitor = Queue.new # In Ruby 3.0 or earlier, Queue#initialize doesn't receive initial values. (1..(count - 1)).each { |i| finished_monitor.push(i) } end - runloop_stopper = options[:stopper] + consume_worker_queue_stopper = options[:stopper] Thread.handle_interrupt(Exception => :never) do Thread.handle_interrupt(Exception => :immediate) do @@ -288,11 +288,11 @@ def in_threads(options = { count: 2 }) begin finished_monitor&.pop(true) # This must be executed even if the worker thread is killed (by #work_in_processes). rescue ThreadError # Queue#pop raises ThreadError when the queue is empty. - runloop_stopper&.call # Stop JobFactory#consume_worker_queue + consume_worker_queue_stopper&.call # Stop JobFactory#consume_worker_queue end end end - options[:runloop]&.call # Invoke lambda in caller thread, and provide jobs to thread queue. + options[:consume_worker_queue]&.call # Invoke lambda in caller thread, and provide jobs to thread queue. threads.map(&:value) end ensure @@ -506,7 +506,7 @@ def work_in_threads(job_factory, options, &block) results_mutex = Mutex.new # arrays are not thread-safe on jRuby exception = nil - thread_options = options.merge(runloop: job_factory.method(:consume_worker_queue), stopper: job_factory.method(:stop)) + thread_options = options.merge(consume_worker_queue: job_factory.method(:consume_worker_queue), stopper: job_factory.method(:stop)) in_threads(thread_options) do |worker_num| Thread.current.thread_variable_set(:parallel_queue, Thread::Queue.new) self.worker_number = worker_num @@ -600,7 +600,7 @@ def work_in_processes(job_factory, options, &blk) exception = nil UserInterruptHandler.kill_on_ctrl_c(workers.map(&:pid), options) do - thread_options = options.merge(runloop: job_factory.method(:consume_worker_queue), stopper: job_factory.method(:stop)) + thread_options = options.merge(consume_worker_queue: job_factory.method(:consume_worker_queue), stopper: job_factory.method(:stop)) in_threads(thread_options) do |i| worker = workers[i] worker.thread = Thread.current From 98a74e4320ad662eade9ae919fca362e957a796a Mon Sep 17 00:00:00 2001 From: Takahiro Nagai <78393959+takahiro-blab@users.noreply.github.com> Date: Sun, 4 May 2025 23:41:09 +0900 Subject: [PATCH 16/18] Make use a mutex and a counter instead of a Queue in Parallel.in_threads --- lib/parallel.rb | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/lib/parallel.rb b/lib/parallel.rb index 997b9df..90a6d34 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -264,19 +264,8 @@ def in_threads(options = { count: 2 }) threads = [] count, options = extract_count_from_options(options) - # Explanation: - # The queue `finished_monitor` is initialized with `count - 1` values instead of `count`. - # This design ensures that all but one thread can retrieve a value from the queue by calling `finished_monitor.pop(true)`. - # The last thread will attempt to pop from the empty queue and raise a `ThreadError` exception. - # This exception triggers the rescue section where `consume_worker_queue_stopper` is called, and this stops `JobFactory#consume_worker_queue`. - # By raising this exception for the last thread, we ensure that `JobFactory#stop` is called exactly once. - # Note: While multiple calls to `JobFactory#stopper` might have no side effects, this approach guarantees - # that it is invoked in a controlled and predictable manner. - if options[:consume_worker_queue] - # Insert values, one less in count than the number of threads. - finished_monitor = Queue.new # In Ruby 3.0 or earlier, Queue#initialize doesn't receive initial values. - (1..(count - 1)).each { |i| finished_monitor.push(i) } - end + counter = count # worker thread remaining counter + mutex = options[:consume_worker_queue] ? Mutex.new : nil consume_worker_queue_stopper = options[:stopper] Thread.handle_interrupt(Exception => :never) do @@ -285,10 +274,12 @@ def in_threads(options = { count: 2 }) threads << Thread.new do yield(i) ensure - begin - finished_monitor&.pop(true) # This must be executed even if the worker thread is killed (by #work_in_processes). - rescue ThreadError # Queue#pop raises ThreadError when the queue is empty. - consume_worker_queue_stopper&.call # Stop JobFactory#consume_worker_queue + mutex&.synchronize do + if counter <= 1 + # Last one thread calls consume_worker_queue_stopper + consume_worker_queue_stopper.call + end + counter -= 1 end end end From f3a9dd29025b1748784ccc040500feb77b79b098 Mon Sep 17 00:00:00 2001 From: Takahiro Nagai <78393959+takahiro-blab@users.noreply.github.com> Date: Mon, 12 May 2025 18:58:19 +0900 Subject: [PATCH 17/18] Update comment of Parallel.in_threads in lib/parallel.rb Co-authored-by: Michael Grosser --- lib/parallel.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/parallel.rb b/lib/parallel.rb index 90a6d34..6ceec76 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -276,7 +276,7 @@ def in_threads(options = { count: 2 }) ensure mutex&.synchronize do if counter <= 1 - # Last one thread calls consume_worker_queue_stopper + # last thread needs to stop the worker queue processing consume_worker_queue_stopper.call end counter -= 1 From 9b9f4bdff9ac51606d8123f8108e7eb8c06dbdae Mon Sep 17 00:00:00 2001 From: Takahiro Nagai <78393959+takahiro-blab@users.noreply.github.com> Date: Mon, 12 May 2025 18:59:33 +0900 Subject: [PATCH 18/18] Rename `JobFactory#consume_worker_queue` to `consume_worker_queues` Co-authored-by: Michael Grosser --- lib/parallel.rb | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/parallel.rb b/lib/parallel.rb index 6ceec76..8cf315d 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -133,7 +133,7 @@ def next [item, index] end - def consume_worker_queue + def consume_worker_queues return unless @worker_queues loop do @@ -265,8 +265,8 @@ def in_threads(options = { count: 2 }) count, options = extract_count_from_options(options) counter = count # worker thread remaining counter - mutex = options[:consume_worker_queue] ? Mutex.new : nil - consume_worker_queue_stopper = options[:stopper] + mutex = options[:consume_worker_queues] ? Mutex.new : nil + consume_worker_queues_stopper = options[:stopper] Thread.handle_interrupt(Exception => :never) do Thread.handle_interrupt(Exception => :immediate) do @@ -277,13 +277,13 @@ def in_threads(options = { count: 2 }) mutex&.synchronize do if counter <= 1 # last thread needs to stop the worker queue processing - consume_worker_queue_stopper.call + consume_worker_queues_stopper.call end counter -= 1 end end end - options[:consume_worker_queue]&.call # Invoke lambda in caller thread, and provide jobs to thread queue. + options[:consume_worker_queues]&.call # Invoke lambda in caller thread, and provide jobs to thread queue. threads.map(&:value) end ensure @@ -497,7 +497,7 @@ def work_in_threads(job_factory, options, &block) results_mutex = Mutex.new # arrays are not thread-safe on jRuby exception = nil - thread_options = options.merge(consume_worker_queue: job_factory.method(:consume_worker_queue), stopper: job_factory.method(:stop)) + thread_options = options.merge(consume_worker_queues: job_factory.method(:consume_worker_queues), stopper: job_factory.method(:stop)) in_threads(thread_options) do |worker_num| Thread.current.thread_variable_set(:parallel_queue, Thread::Queue.new) self.worker_number = worker_num @@ -591,7 +591,7 @@ def work_in_processes(job_factory, options, &blk) exception = nil UserInterruptHandler.kill_on_ctrl_c(workers.map(&:pid), options) do - thread_options = options.merge(consume_worker_queue: job_factory.method(:consume_worker_queue), stopper: job_factory.method(:stop)) + thread_options = options.merge(consume_worker_queues: job_factory.method(:consume_worker_queues), stopper: job_factory.method(:stop)) in_threads(thread_options) do |i| worker = workers[i] worker.thread = Thread.current