diff --git a/Readme.md b/Readme.md index 8168b89..c389721 100644 --- a/Readme.md +++ b/Readme.md @@ -46,6 +46,22 @@ items = [1,2,3] Parallel.each( -> { items.pop || Parallel::Stop }) { |number| ... } ``` +Iterations can be stopped by raising StopIteration. + +```Ruby +items = [1,2,3] +Parallel.each( -> { items.pop || raise(StopIteration) }) { |number| ... } +``` + +Also supports Enumerator instances as a source. + +```Ruby +enumerator = Enumerator.new do |y| + y << 1; y << 2; y << 3 +end +Parallel.each( enumerator ) { |number| ... } +``` + Also supports `any?` or `all?` ```Ruby diff --git a/lib/parallel.rb b/lib/parallel.rb index d0c3f37..8cf315d 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -97,20 +97,30 @@ def wait class JobFactory def initialize(source, mutex) - @lambda = (source.respond_to?(:call) && source) || queue_wrapper(source) - @source = source.to_a unless @lambda # turn Range and other Enumerable-s into an Array + @lambda = enum_wrapper(source) || (source.respond_to?(:call) && source) || queue_wrapper(source) + @source = source.to_a unless @lambda # turn non-Enumerable-s into an Array + @worker_queues = Thread::Queue.new if @lambda @mutex = mutex @index = -1 @stopped = false end def next - if producer? + queue_for_thread = Thread.current.thread_variable_get(:parallel_queue) + if @worker_queues && queue_for_thread + return if @stopped + # This #next method may be called from some threads at the same time. + # The main (Parallel's singleton method caller) thread calls @lambda and checks `item == Stop`, + # so it's not necessary to check for Stop here. + item = worker_queues_enq(queue_for_thread) + return if item == Stop + index = @index += 1 + elsif producer? # - index and item stay in sync # - do not call lambda after it has returned Stop item, index = @mutex.synchronize do return if @stopped - item = @lambda.call + item = call_lambda @stopped = (item == Stop) return if @stopped [item, @index += 1] @@ -123,6 +133,26 @@ def next [item, index] end + def consume_worker_queues + return unless @worker_queues + + loop do + queue = @worker_queues.pop + return if queue == Stop + item = call_lambda + queue.push(item) + break if item == Stop + end + @stopped = true + # clear out all work queues by adding a "stop" to them which will stop the thread working on them + begin + while queue = @worker_queues.pop(true) + queue.push(Stop) if queue != Stop # Unlock waiting threads. + end + rescue ThreadError # All threads are unlocked. + end + end + def size if producer? Float::INFINITY @@ -142,8 +172,23 @@ def unpack(data) producer? ? data : [@source[data], data] end + def stop + @worker_queues&.push(Stop) + end + private + def call_lambda + @lambda.call + rescue StopIteration + Stop + end + + def worker_queues_enq(queue_for_thread) + @worker_queues.push(queue_for_thread) + queue_for_thread.pop # Wait until @lambda returns. + end + def producer? @lambda end @@ -151,6 +196,12 @@ def producer? def queue_wrapper(array) array.respond_to?(:num_waiting) && array.respond_to?(:pop) && -> { array.pop(false) } end + + # returns the `next` method if the source in an enumerator and cannot be accessed by anything more efficient like [] + def enum_wrapper(source) + # Convert what is inaccessible by the index + !source.respond_to?(:[]) && source.respond_to?(:next) && source.method(:next) + end end class UserInterruptHandler @@ -211,13 +262,28 @@ def restore_interrupt(old, signal) class << self def in_threads(options = { count: 2 }) threads = [] - count, = extract_count_from_options(options) + count, options = extract_count_from_options(options) + + counter = count # worker thread remaining counter + mutex = options[:consume_worker_queues] ? Mutex.new : nil + consume_worker_queues_stopper = options[:stopper] Thread.handle_interrupt(Exception => :never) do Thread.handle_interrupt(Exception => :immediate) do count.times do |i| - threads << Thread.new { yield(i) } + threads << Thread.new do + yield(i) + ensure + mutex&.synchronize do + if counter <= 1 + # last thread needs to stop the worker queue processing + consume_worker_queues_stopper.call + end + counter -= 1 + end + end end + options[:consume_worker_queues]&.call # Invoke lambda in caller thread, and provide jobs to thread queue. threads.map(&:value) end ensure @@ -431,7 +497,9 @@ def work_in_threads(job_factory, options, &block) results_mutex = Mutex.new # arrays are not thread-safe on jRuby exception = nil - in_threads(options) do |worker_num| + thread_options = options.merge(consume_worker_queues: job_factory.method(:consume_worker_queues), stopper: job_factory.method(:stop)) + in_threads(thread_options) do |worker_num| + Thread.current.thread_variable_set(:parallel_queue, Thread::Queue.new) self.worker_number = worker_num # as long as there are more jobs, work on one of them while !exception && (set = job_factory.next) @@ -523,9 +591,11 @@ def work_in_processes(job_factory, options, &blk) exception = nil UserInterruptHandler.kill_on_ctrl_c(workers.map(&:pid), options) do - in_threads(options) do |i| + thread_options = options.merge(consume_worker_queues: job_factory.method(:consume_worker_queues), stopper: job_factory.method(:stop)) + in_threads(thread_options) do |i| worker = workers[i] worker.thread = Thread.current + Thread.current.thread_variable_set(:parallel_queue, Thread::Queue.new) worked = false begin diff --git a/spec/cases/infinite_sequence.rb b/spec/cases/infinite_sequence.rb new file mode 100644 index 0000000..14a393a --- /dev/null +++ b/spec/cases/infinite_sequence.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +# Reproduction case based on GitHub Issue #211 +# Original code provided by @cyclotron3k in the issue +# using a enum that is infinite, so this will hang forever when trying to convert to an array + +require 'prime' +require './spec/cases/helper' + +private_key = 12344567899 + +results = [] + +[{ in_threads: 2 }, { in_threads: 0 }].each do |options| + primes = Prime.to_enum + Parallel.each(primes, options) do |prime| + if private_key % prime == 0 + results << prime.to_s + raise Parallel::Break + end + end +end + +print results.join(',') diff --git a/spec/cases/lambda_call_same_thread.rb b/spec/cases/lambda_call_same_thread.rb new file mode 100644 index 0000000..a1a7719 --- /dev/null +++ b/spec/cases/lambda_call_same_thread.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true +require './spec/cases/helper' + +runner_thread = nil +all = [3, 2, 1] +my_proc = proc { + runner_thread ||= Thread.current + if Thread.current != runner_thread + raise "proc is called in different thread!" + end + + all.pop || Parallel::Stop +} + +class Callback + def self.call(x) + $stdout.sync = true + "ITEM-#{x}" + end +end +puts(Parallel.map(my_proc, in_threads: 2) { |(i, _id)| Callback.call i }) diff --git a/spec/cases/lambda_can_stop_by_exception.rb b/spec/cases/lambda_can_stop_by_exception.rb new file mode 100644 index 0000000..da8b59e --- /dev/null +++ b/spec/cases/lambda_can_stop_by_exception.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true +require './spec/cases/helper' + +def generate_proc + count = 0 + proc { + raise StopIteration if 3 <= count + count += 1 + } +end + +class Callback + def self.call(x) + $stdout.sync = true + "ITEM-#{x}" + end +end + +[{ in_processes: 2 }, { in_threads: 2 }, { in_threads: 0 }].each do |options| + puts(Parallel.map(generate_proc, options) { |(i, _id)| Callback.call i }) +end diff --git a/spec/parallel_spec.rb b/spec/parallel_spec.rb index 1589f0c..e6835b0 100644 --- a/spec/parallel_spec.rb +++ b/spec/parallel_spec.rb @@ -722,6 +722,18 @@ def cpus end end + it "can process infinite sequence enumerator" do + ruby("spec/cases/infinite_sequence.rb").split(',').should == ['139'] * 2 + end + + it "can be finished by lambda raising StopIteration" do + ruby("spec/cases/lambda_can_stop_by_exception.rb").should == "ITEM-1\nITEM-2\nITEM-3\n" * 3 + end + + it "must call lambda in same thread" do + ruby("spec/cases/lambda_call_same_thread.rb").should == "ITEM-1\nITEM-2\nITEM-3\n" + end + it "fails when running with a prefilled queue without stop since there are no threads to fill it" do error = (RUBY_VERSION >= "2.0.0" ? "No live threads left. Deadlock?" : "deadlock detected (fatal)") ruby("spec/cases/fatal_queue.rb 2>&1").should include error