Skip to content

Commit b9b3b71

Browse files
authored
chore: patch some minor changes (#395)
1 parent ef518f6 commit b9b3b71

File tree

7 files changed

+144
-119
lines changed

7 files changed

+144
-119
lines changed

.github/workflows/test.yaml

+7-7
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,18 @@ jobs:
3232
- {redis: '7.2', ruby: '3.3', driver: 'hiredis'}
3333
- {redis: '7.2', ruby: '3.3', driver: 'hiredis', compose: compose.ssl.yaml}
3434
- {redis: '7.2', ruby: '3.3', compose: compose.replica.yaml, replica: '2'}
35-
- {task: test_cluster_state, pattern: 'PrimaryOnly', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
36-
- {task: test_cluster_state, pattern: 'Pooled', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
37-
- {task: test_cluster_state, pattern: 'ScaleReadRandom', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
38-
- {task: test_cluster_state, pattern: 'ScaleReadRandomWithPrimary', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
39-
- {task: test_cluster_state, pattern: 'ScaleReadLatency', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
35+
- {task: test_cluster_down}
36+
- {task: test_cluster_broken, restart: 'no', startup: '6'}
4037
- {redis: '8', ruby: '3.3', compose: compose.valkey.yaml, replica: '2'}
4138
- {redis: '7.2', ruby: '3.2', compose: compose.auth.yaml}
4239
- {redis: '7.0', ruby: '3.1'}
4340
- {redis: '6.2', ruby: '3.0'}
4441
- {redis: '5.0', ruby: '2.7'}
45-
- {task: test_cluster_down}
46-
- {task: test_cluster_broken, restart: 'no', startup: '6'}
42+
- {task: test_cluster_state, pattern: 'PrimaryOnly', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
43+
- {task: test_cluster_state, pattern: 'Pooled', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
44+
- {task: test_cluster_state, pattern: 'ScaleReadRandom', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
45+
- {task: test_cluster_state, pattern: 'ScaleReadRandomWithPrimary', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
46+
- {task: test_cluster_state, pattern: 'ScaleReadLatency', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
4747
- {ruby: 'jruby'}
4848
- {ruby: 'truffleruby'}
4949
- {task: test_cluster_scale, pattern: 'Single', compose: compose.scale.yaml, startup: '8'}

bin/pubsub

+36-30
Original file line numberDiff line numberDiff line change
@@ -5,57 +5,57 @@ require 'bundler/setup'
55
require 'redis_cluster_client'
66

77
module PubSubDebug
8+
WAIT_SEC = 2.0
9+
810
module_function
911

10-
def spawn_publisher(cli, chan)
11-
Thread.new(cli, chan) do |r, c|
12-
role = ' Publisher'
12+
def spawn_publisher(client, channel)
13+
Thread.new(client, channel) do |cli, chan|
14+
role = 'Publisher'
1315
i = 0
1416

1517
loop do
1618
handle_errors(role) do
17-
msg = format('%05d', i)
18-
r.call('spublish', c, msg)
19-
log "#{role}: sent: #{msg}"
19+
cli.call('spublish', chan, i)
20+
log(role, :spublish, chan, i)
2021
i += 1
2122
end
2223
ensure
23-
sleep 1.0
24+
sleep WAIT_SEC
2425
end
2526
rescue StandardError => e
26-
log "#{role}: dead: #{e.class}: #{e.message}"
27+
log(role, :dead, e.class, e.message)
2728
raise
2829
end
2930
end
3031

31-
def spawn_subscriber(cli, chan) # rubocop:disable Metrics/AbcSize
32-
Thread.new(cli, chan) do |r, c|
32+
def spawn_subscriber(client, channel) # rubocop:disable Metrics/AbcSize
33+
Thread.new(client, channel) do |cli, chan|
3334
role = 'Subscriber'
3435
ps = nil
3536

3637
loop do
37-
ps = r.pubsub
38-
ps.call('ssubscribe', c)
39-
log "#{role}: done: subscription started to #{c}"
38+
ps = cli.pubsub
39+
ps.call('ssubscribe', chan)
4040
break
4141
rescue StandardError => e
42-
log "#{role}: init: #{e.class}: #{e.message}"
42+
log(role, :init, e.class, e.message)
4343
ps&.close
4444
ensure
45-
sleep 1.0
45+
sleep WAIT_SEC
4646
end
4747

4848
loop do
4949
handle_errors('Subscriber') do
50-
e = ps.next_event(0.01)
51-
log "#{role}: recv: #{e.nil? ? 'nil' : e}"
52-
ps.call('ssubscribe', c) if !e.nil? && e.first == 'sunsubscribe'
50+
event = ps.next_event(WAIT_SEC)
51+
log(role, *event) unless event.nil?
52+
case event&.first
53+
when 'sunsubscribe' then ps.call('ssubscribe', chan)
54+
end
5355
end
54-
ensure
55-
sleep 1.0
5656
end
5757
rescue StandardError, SignalException => e
58-
log "#{role}: dead: #{e.class}: #{e.message}"
58+
log(role, :dead, e.class, e.message)
5959
ps&.close
6060
raise
6161
end
@@ -64,23 +64,26 @@ module PubSubDebug
6464
def handle_errors(role)
6565
yield
6666
rescue RedisClient::ConnectionError, RedisClient::Cluster::InitialSetupError, RedisClient::Cluster::NodeMightBeDown => e
67-
log "#{role}: recv: #{e.class}"
67+
log(role, e.class)
6868
rescue RedisClient::CommandError => e
69-
log "#{role}: recv: #{e.class}: #{e.message}"
69+
log(role, e.class, e.message)
7070
raise unless e.message.start_with?('CLUSTERDOWN')
7171
rescue StandardError => e
72-
log "#{role}: recv: #{e.class}: #{e.message}"
72+
log(role, e.class, e.message)
7373
raise
7474
end
7575

76-
def log(msg)
77-
print "#{msg}\n"
76+
def log(*texts)
77+
return if texts.nil? || texts.empty?
78+
79+
message = texts.map { |text| "#{' ' * [15 - text.to_s.size, 0].max}#{text}" }.join(': ')
80+
print "#{message}\n"
7881
end
7982
end
8083

81-
clients = Array.new(2) { RedisClient.cluster(connect_with_original_config: true).new_client }
84+
nodes = (6379..6384).map { |port| "redis://127.0.0.1:#{port}" }.freeze
85+
clients = Array.new(6) { RedisClient.cluster(nodes: nodes, connect_with_original_config: true).new_client }.freeze
8286
threads = []
83-
channel = 'chan1'
8487

8588
Signal.trap(:INT) do
8689
threads.each(&:exit)
@@ -89,6 +92,9 @@ Signal.trap(:INT) do
8992
exit 0
9093
end
9194

92-
threads << PubSubDebug.spawn_subscriber(clients[0], channel)
93-
threads << PubSubDebug.spawn_publisher(clients[1], channel)
95+
%w[chan1 chan2 chan3].each_with_index do |channel, i|
96+
threads << PubSubDebug.spawn_subscriber(clients[i], channel)
97+
threads << PubSubDebug.spawn_publisher(clients[i + 3], channel)
98+
end
99+
94100
threads.each(&:join)

bin/singlepiptx

+50-36
Original file line numberDiff line numberDiff line change
@@ -5,96 +5,101 @@ require 'bundler/setup'
55
require 'redis_cluster_client'
66

77
module SinglePipTxDebug
8+
WAIT_SEC = 2.0
9+
810
module_function
911

10-
def spawn_single(cli)
11-
Thread.new(cli) do |r|
12-
role = ' Single'
12+
def spawn_single(client, key)
13+
Thread.new(client, key) do |cli, k|
14+
role = 'Single'
1315

1416
loop do
1517
handle_errors(role) do
16-
reply = r.call('incr', 'single')
17-
log "#{role}: #{reply}"
18+
reply = cli.call('incr', k)
19+
log(role, k, reply)
1820
end
1921
ensure
20-
sleep 1.0
22+
sleep WAIT_SEC
2123
end
2224
rescue StandardError => e
23-
log "#{role}: dead: #{e.class}: #{e.message}"
25+
log(role, :dead, e.class, e.message)
2426
raise
2527
end
2628
end
2729

28-
def spawn_pipeline(cli)
29-
Thread.new(cli) do |r|
30-
role = ' Pipeline'
30+
def spawn_pipeline(client, key)
31+
Thread.new(client, key) do |cli, k|
32+
role = 'Pipeline'
3133

3234
loop do
3335
handle_errors(role) do
34-
reply = r.pipelined do |pi|
35-
pi.call('incr', 'pipeline')
36-
pi.call('incr', 'pipeline')
36+
reply = cli.pipelined do |pi|
37+
pi.call('incr', k)
38+
pi.call('incr', k)
3739
end
3840

39-
log "#{role}: #{reply}"
41+
log(role, k, reply.last)
4042
end
4143
ensure
42-
sleep 1.0
44+
sleep WAIT_SEC
4345
end
4446
rescue StandardError => e
45-
log "#{role}: dead: #{e.class}: #{e.message}"
47+
log(role, :dead, e.class, e.message)
4648
raise
4749
end
4850
end
4951

50-
def spawn_transaction(cli)
51-
Thread.new(cli) do |r|
52+
def spawn_transaction(client, key)
53+
Thread.new(client, key) do |cli, k|
5254
role = 'Transaction'
5355
i = 0
5456

5557
loop do
5658
handle_errors(role) do
57-
reply = r.multi(watch: i.odd? ? %w[transaction] : nil) do |tx|
58-
i += 1
59-
tx.call('incr', 'transaction')
60-
tx.call('incr', 'transaction')
61-
tx.call('incr', 'transaction')
59+
reply = cli.multi(watch: i.odd? ? [k] : nil) do |tx|
60+
tx.call('incr', k)
61+
tx.call('incr', k)
6262
end
6363

64-
log "#{role}: #{reply}"
64+
log(role, k, reply.last)
65+
i += 1
6566
end
6667
ensure
67-
sleep 1.0
68+
sleep WAIT_SEC
6869
end
6970
rescue StandardError => e
70-
log "#{role}: dead: #{e.class}: #{e.message}"
71+
log(role, :dead, e.class, e.message)
7172
raise
7273
end
7374
end
7475

7576
def handle_errors(role) # rubocop:disable Metrics/AbcSize
7677
yield
7778
rescue RedisClient::ConnectionError, RedisClient::Cluster::InitialSetupError, RedisClient::Cluster::NodeMightBeDown => e
78-
log "#{role}: #{e.class}"
79+
log(role, e.class)
7980
rescue RedisClient::CommandError => e
80-
log "#{role}: #{e.class}: #{e.message}"
81+
log(role, e.class, e.message)
8182
raise unless e.message.start_with?('CLUSTERDOWN')
8283
rescue RedisClient::Cluster::ErrorCollection => e
83-
log "#{role}: #{e.class}: #{e.message}"
84+
log(role, e.class, e.message)
8485
raise unless e.errors.values.all? do |err|
8586
err.message.start_with?('CLUSTERDOWN') || err.is_a?(::RedisClient::ConnectionError)
8687
end
8788
rescue StandardError => e
88-
log "#{role}: #{e.class}: #{e.message}"
89+
log(role, e.class, e.message)
8990
raise
9091
end
9192

92-
def log(msg)
93-
print "#{msg}\n"
93+
def log(*texts)
94+
return if texts.nil? || texts.empty?
95+
96+
message = texts.map { |text| "#{' ' * [15 - text.to_s.size, 0].max}#{text}" }.join(': ')
97+
print "#{message}\n"
9498
end
9599
end
96100

97-
clients = Array.new(3) { RedisClient.cluster(connect_with_original_config: true).new_client }
101+
nodes = (6379..6384).map { |port| "redis://127.0.0.1:#{port}" }.freeze
102+
clients = Array.new(9) { RedisClient.cluster(nodes: nodes, connect_with_original_config: true).new_client }.freeze
98103
threads = []
99104

100105
Signal.trap(:INT) do
@@ -104,7 +109,16 @@ Signal.trap(:INT) do
104109
exit 0
105110
end
106111

107-
threads << SinglePipTxDebug.spawn_single(clients[0])
108-
threads << SinglePipTxDebug.spawn_pipeline(clients[1])
109-
threads << SinglePipTxDebug.spawn_transaction(clients[2])
112+
%w[single1 single3 single4].each_with_index do |key, i|
113+
threads << SinglePipTxDebug.spawn_single(clients[i], key)
114+
end
115+
116+
%w[pipeline1 pipeline2 pipeline4].each_with_index do |key, i|
117+
threads << SinglePipTxDebug.spawn_pipeline(clients[i + 3], key)
118+
end
119+
120+
%w[transaction1 transaction3 transaction4].each_with_index do |key, i|
121+
threads << SinglePipTxDebug.spawn_transaction(clients[i + 6], key)
122+
end
123+
110124
threads.each(&:join)

lib/redis_client/cluster.rb

+3-3
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ class Cluster
1515

1616
attr_reader :config
1717

18-
def initialize(config, pool: nil, concurrency: nil, **kwargs)
19-
@config = config
18+
def initialize(config = nil, pool: nil, concurrency: nil, **kwargs)
19+
@config = config.nil? ? ClusterConfig.new(**kwargs) : config
2020
@concurrent_worker = ::RedisClient::Cluster::ConcurrentWorker.create(**(concurrency || {}))
21-
@command_builder = config.command_builder
21+
@command_builder = @config.command_builder
2222

2323
@pool = pool
2424
@kwargs = kwargs

lib/redis_client/cluster/node.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ def refetch_node_info_list(startup_clients) # rubocop:disable Metrics/AbcSize, M
309309
work_group.push(i, raw_client) do |client|
310310
regular_timeout = client.read_timeout
311311
client.read_timeout = @config.slow_command_timeout > 0.0 ? @config.slow_command_timeout : regular_timeout
312-
reply = client.call('CLUSTER', 'NODES')
312+
reply = client.call_once('CLUSTER', 'NODES')
313313
client.read_timeout = regular_timeout
314314
parse_cluster_node_reply(reply)
315315
rescue StandardError => e

test/redis_client/test_cluster.rb

+3-2
Original file line numberDiff line numberDiff line change
@@ -892,8 +892,9 @@ def wait_for_replication
892892
server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i
893893
swap_timeout(@client, timeout: 0.1) do |client|
894894
client&.blocking_call(client_side_timeout, 'WAIT', TEST_REPLICA_SIZE, server_side_timeout)
895-
rescue RedisClient::ConnectionError
896-
# ignore
895+
rescue RedisClient::Cluster::ErrorCollection => e
896+
# FIXME: flaky in jruby on #test_pubsub_with_wrong_command
897+
raise unless e.errors.values.all? { |err| err.is_a?(::RedisClient::ConnectionError) }
897898
end
898899
end
899900

0 commit comments

Comments
 (0)