Skip to content

Commit afd2a43

Browse files
authored
fix: broken behavior for Pub/Sub (#220)
1 parent 256c7df commit afd2a43

File tree

2 files changed

+14
-19
lines changed

2 files changed

+14
-19
lines changed

lib/redis_client/cluster/pub_sub.rb

+8-7
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ def initialize(router, command_builder)
99
@router = router
1010
@command_builder = command_builder
1111
@pubsub_states = {}
12+
@messages = []
1213
end
1314

1415
def call(*args, **kwargs)
@@ -22,15 +23,15 @@ def call_v(command)
2223
def close
2324
@pubsub_states.each_value(&:close)
2425
@pubsub_states.clear
26+
@messages.clear
2527
end
2628

2729
def next_event(timeout = nil)
2830
return if @pubsub_states.empty?
31+
return @messages.shift unless @messages.empty?
2932

30-
msgs = collect_messages(timeout).compact
31-
return msgs.first if msgs.size < 2
32-
33-
msgs
33+
collect_messages(timeout)
34+
@messages.shift
3435
end
3536

3637
private
@@ -45,8 +46,8 @@ def _call(command)
4546
pubsub.call_v(command)
4647
end
4748

48-
def collect_messages(timeout) # rubocop:disable Metrics/AbcSize
49-
@pubsub_states.each_slice(MAX_THREADS).each_with_object([]) do |chuncked_pubsub_states, acc|
49+
def collect_messages(timeout)
50+
@pubsub_states.each_slice(MAX_THREADS) do |chuncked_pubsub_states|
5051
threads = chuncked_pubsub_states.map do |_, v|
5152
Thread.new(v) do |pubsub|
5253
Thread.current[:reply] = pubsub.next_event(timeout)
@@ -57,7 +58,7 @@ def collect_messages(timeout) # rubocop:disable Metrics/AbcSize
5758

5859
threads.each do |t|
5960
t.join
60-
acc << t[:reply] unless t[:reply].nil?
61+
@messages << t[:reply] unless t[:reply].nil?
6162
end
6263
end
6364
end

test/redis_client/test_cluster.rb

+6-12
Original file line numberDiff line numberDiff line change
@@ -214,10 +214,8 @@ def test_global_pubsub_with_multiple_channels
214214

215215
sub = Fiber.new do |pubsub|
216216
pubsub.call('SUBSCRIBE', *Array.new(10) { |i| "g-chan#{i}" })
217-
assert_equal(
218-
Array.new(10) { |i| ['subscribe', "g-chan#{i}", i + 1] },
219-
collect_messages(pubsub).sort_by { |e| e[1].to_s }
220-
)
217+
got = collect_messages(pubsub).sort_by { |e| e[1].to_s }
218+
10.times { |i| assert_equal(['subscribe', "g-chan#{i}", i + 1], got[i]) }
221219
Fiber.yield
222220
Fiber.yield(collect_messages(pubsub))
223221
pubsub.call('UNSUBSCRIBE')
@@ -229,10 +227,8 @@ def test_global_pubsub_with_multiple_channels
229227
cli.pipelined { |pi| 10.times { |i| pi.call('PUBLISH', "g-chan#{i}", i) } }
230228
end
231229

232-
assert_equal(
233-
Array.new(10) { |i| ['message', "g-chan#{i}", i.to_s] },
234-
sub.resume.sort_by { |e| e[1].to_s }
235-
)
230+
got = sub.resume.sort_by { |e| e[1].to_s }
231+
10.times { |i| assert_equal(['message', "g-chan#{i}", i.to_s], got[i]) }
236232
end
237233

238234
def test_sharded_pubsub
@@ -285,10 +281,8 @@ def test_sharded_pubsub_with_multiple_channels
285281
cli.pipelined { |pi| 10.times { |i| pi.call('SPUBLISH', "s-chan#{i}", i) } }
286282
end
287283

288-
assert_equal(
289-
Array.new(10) { |i| ['smessage', "s-chan#{i}", i.to_s] },
290-
sub.resume.sort_by { |e| e[1].to_s }
291-
)
284+
got = sub.resume.sort_by { |e| e[1].to_s }
285+
10.times { |i| assert_equal(['smessage', "s-chan#{i}", i.to_s], got[i]) }
292286
end
293287

294288
def test_close

0 commit comments

Comments
 (0)