Skip to content

Commit 364e5bc

Browse files
authored
feat: add an option to the pipeline feature to be able to select either throwing an error or returning errors as is (#343)
1 parent 23fd884 commit 364e5bc

File tree

3 files changed

+48
-12
lines changed

3 files changed

+48
-12
lines changed

lib/redis_client/cluster.rb

+9-2
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,16 @@ def zscan(key, *args, **kwargs, &block)
8282
@router.try_delegate(node, :zscan, key, *args, **kwargs, &block)
8383
end
8484

85-
def pipelined
85+
def pipelined(exception: true)
8686
seed = @config.use_replica? && @config.replica_affinity == :random ? nil : Random.new_seed
87-
pipeline = ::RedisClient::Cluster::Pipeline.new(@router, @command_builder, @concurrent_worker, seed: seed)
87+
pipeline = ::RedisClient::Cluster::Pipeline.new(
88+
@router,
89+
@command_builder,
90+
@concurrent_worker,
91+
exception: exception,
92+
seed: seed
93+
)
94+
8895
yield pipeline
8996
return [] if pipeline.empty?
9097

lib/redis_client/cluster/pipeline.rb

+17-10
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class Pipeline
1212
class Extended < ::RedisClient::Pipeline
1313
attr_reader :outer_indices
1414

15-
def initialize(command_builder)
15+
def initialize(...)
1616
super
1717
@outer_indices = nil
1818
end
@@ -50,14 +50,14 @@ def get_block(inner_index)
5050
end
5151

5252
::RedisClient::ConnectionMixin.module_eval do
53-
def call_pipelined_aware_of_redirection(commands, timeouts) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
53+
def call_pipelined_aware_of_redirection(commands, timeouts, exception:) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
5454
size = commands.size
5555
results = Array.new(commands.size)
5656
@pending_reads += size
5757
write_multi(commands)
5858

5959
redirection_indices = nil
60-
exception = nil
60+
first_exception = nil
6161
size.times do |index|
6262
timeout = timeouts && timeouts[index]
6363
result = read(timeout)
@@ -67,14 +67,14 @@ def call_pipelined_aware_of_redirection(commands, timeouts) # rubocop:disable Me
6767
if result.is_a?(::RedisClient::CommandError) && result.message.start_with?('MOVED', 'ASK')
6868
redirection_indices ||= []
6969
redirection_indices << index
70-
else
71-
exception ||= result
70+
elsif exception
71+
first_exception ||= result
7272
end
7373
end
7474
results[index] = result
7575
end
7676

77-
raise exception if exception
77+
raise first_exception if exception && first_exception
7878
return results if redirection_indices.nil?
7979

8080
err = ::RedisClient::Cluster::Pipeline::RedirectionNeeded.new
@@ -98,10 +98,11 @@ class RedirectionNeeded < ::RedisClient::Error
9898
attr_accessor :replies, :indices
9999
end
100100

101-
def initialize(router, command_builder, concurrent_worker, seed: Random.new_seed)
101+
def initialize(router, command_builder, concurrent_worker, exception:, seed: Random.new_seed)
102102
@router = router
103103
@command_builder = command_builder
104104
@concurrent_worker = concurrent_worker
105+
@exception = exception
105106
@seed = seed
106107
@pipelines = nil
107108
@size = 0
@@ -212,7 +213,7 @@ def send_pipeline(client, pipeline)
212213
results = client.ensure_connected_cluster_scoped(retryable: pipeline._retryable?) do |connection|
213214
commands = pipeline._commands
214215
client.middlewares.call_pipelined(commands, client.config) do
215-
connection.call_pipelined_aware_of_redirection(commands, pipeline._timeouts)
216+
connection.call_pipelined_aware_of_redirection(commands, pipeline._timeouts, exception: @exception)
216217
end
217218
end
218219

@@ -224,15 +225,21 @@ def handle_redirection(err, pipeline, inner_index)
224225

225226
if err.message.start_with?('MOVED')
226227
node = @router.assign_redirection_node(err.message)
227-
redirect_command(node, pipeline, inner_index)
228+
try_redirection(node, pipeline, inner_index)
228229
elsif err.message.start_with?('ASK')
229230
node = @router.assign_asking_node(err.message)
230-
try_asking(node) ? redirect_command(node, pipeline, inner_index) : err
231+
try_asking(node) ? try_redirection(node, pipeline, inner_index) : err
231232
else
232233
err
233234
end
234235
end
235236

237+
def try_redirection(node, pipeline, inner_index)
238+
redirect_command(node, pipeline, inner_index)
239+
rescue StandardError => e
240+
@exception ? raise : e
241+
end
242+
236243
def redirect_command(node, pipeline, inner_index)
237244
method = pipeline.get_callee_method(inner_index)
238245
command = pipeline.get_command(inner_index)

test/redis_client/test_cluster.rb

+22
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,28 @@ def test_pipelined_with_errors
170170
10.times { |i| assert_equal((i + 10).to_s, @client.call('GET', "string#{i}")) }
171171
end
172172

173+
def test_pipelined_with_errors_as_is
174+
got = @client.pipelined(exception: false) do |pipeline|
175+
10.times do |i|
176+
pipeline.call('SET', "string#{i}", i)
177+
pipeline.call('SET', "string#{i}", i, 'too many args')
178+
pipeline.call('SET', "string#{i}", i + 10)
179+
end
180+
end
181+
182+
assert_equal(30, got.size)
183+
184+
10.times do |i|
185+
assert_equal('OK', got[(3 * i) + 0])
186+
assert_instance_of(::RedisClient::CommandError, got[(3 * i) + 1])
187+
assert_equal('OK', got[(3 * i) + 2])
188+
end
189+
190+
wait_for_replication
191+
192+
10.times { |i| assert_equal((i + 10).to_s, @client.call('GET', "string#{i}")) }
193+
end
194+
173195
def test_pipelined_with_many_commands
174196
@client.pipelined { |pi| 1000.times { |i| pi.call('SET', i, i) } }
175197
wait_for_replication

0 commit comments

Comments
 (0)