@@ -20,6 +20,7 @@ local string_find = string.find
20
20
local redis_crc = xmodem .redis_crc
21
21
22
22
local DEFAULT_SHARED_DICT_NAME = " redis_cluster_slot_locks"
23
+ local DEFAULT_REFRESH_DICT_NAME = " refresh_lock"
23
24
local DEFAULT_MAX_REDIRECTION = 5
24
25
local DEFAULT_MAX_CONNECTION_ATTEMPTS = 3
25
26
local DEFAULT_KEEPALIVE_TIMEOUT = 55000
@@ -97,22 +98,32 @@ local function split(s, delimiter)
97
98
end
98
99
99
100
local function try_hosts_slots (self , serv_list )
101
+ local start_time = ngx .now ()
100
102
local errors = {}
101
103
local config = self .config
102
104
if # serv_list < 1 then
103
105
return nil , " failed to fetch slots, serv_list config is empty"
104
106
end
107
+
105
108
for i = 1 , # serv_list do
106
109
local ip = serv_list [i ].ip
107
110
local port = serv_list [i ].port
108
111
local redis_client = redis :new ()
109
- local ok , err
112
+ local ok , err , max_connection_timeout_err
110
113
redis_client :set_timeouts (config .connect_timeout or DEFAULT_CONNECTION_TIMEOUT ,
111
114
config .send_timeout or DEFAULT_SEND_TIMEOUT ,
112
115
config .read_timeout or DEFAULT_READ_TIMEOUT )
113
116
114
117
-- attempt to connect DEFAULT_MAX_CONNECTION_ATTEMPTS times to redis
115
118
for k = 1 , config .max_connection_attempts or DEFAULT_MAX_CONNECTION_ATTEMPTS do
119
+ local total_connection_time_ms = (ngx .now () - start_time ) * 1000
120
+ if (config .max_connection_timeout and total_connection_time_ms > config .max_connection_timeout ) then
121
+ max_connection_timeout_err = " max_connection_timeout of " .. config .max_connection_timeout .. " ms reached."
122
+ ngx .log (ngx .ERR , max_connection_timeout_err )
123
+ table_insert (errors , max_connection_timeout_err )
124
+ break
125
+ end
126
+
116
127
ok , err = redis_client :connect (ip , port , self .config .connect_opts )
117
128
if ok then break end
118
129
if err then
@@ -180,12 +191,14 @@ local function try_hosts_slots(self, serv_list)
180
191
table_insert (errors , nerr )
181
192
end
182
193
release_connection (redis_client , config )
183
-
194
+
184
195
-- refresh of slots and master nodes successful
185
196
-- not required to connect/iterate over additional hosts
186
197
if nodes_res and slots_info then
187
198
return true , nil
188
199
end
200
+ elseif max_connection_timeout_err then
201
+ break
189
202
else
190
203
table_insert (errors , err )
191
204
end
@@ -201,12 +214,19 @@ function _M.fetch_slots(self)
201
214
local serv_list = self .config .serv_list
202
215
local serv_list_cached = slot_cache [self .config .name .. " serv_list" ]
203
216
204
- local serv_list_combined = {}
217
+ local serv_list_combined
205
218
206
- -- if a cached serv_list is present, use it
219
+ -- if a cached serv_list is present, start with that
207
220
if serv_list_cached then
208
221
serv_list_combined = serv_list_cached .serv_list
222
+
223
+ -- then append the serv_list from config, in the event that the entire
224
+ -- cached serv_list no longer points to anything usable
225
+ for _ , s in ipairs (serv_list ) do
226
+ table_insert (serv_list_combined , s )
227
+ end
209
228
else
229
+ -- otherwise we bootstrap with our serv_list from config
210
230
serv_list_combined = serv_list
211
231
end
212
232
@@ -221,6 +241,30 @@ function _M.fetch_slots(self)
221
241
end
222
242
223
243
244
+ function _M .refresh_slots (self )
245
+ local worker_id = ngx .worker .id ()
246
+ local lock , err , elapsed , ok
247
+ lock , err = resty_lock :new (self .config .dict_name or DEFAULT_SHARED_DICT_NAME , {time_out = 0 })
248
+ if not lock then
249
+ ngx .log (ngx .ERR , " failed to create lock in refresh slot cache: " , err )
250
+ return nil , err
251
+ end
252
+
253
+ local refresh_lock_key = (self .config .refresh_lock_key or DEFAULT_REFRESH_DICT_NAME ) .. worker_id
254
+ elapsed , err = lock :lock (refresh_lock_key )
255
+ if not elapsed then
256
+ return nil , ' race refresh lock fail, ' .. err
257
+ end
258
+
259
+ self :fetch_slots ()
260
+ ok , err = lock :unlock ()
261
+ if not ok then
262
+ ngx .log (ngx .ERR , " failed to unlock in refresh slot cache:" , err )
263
+ return nil , err
264
+ end
265
+ end
266
+
267
+
224
268
function _M .init_slots (self )
225
269
if slot_cache [self .config .name ] then
226
270
-- already initialized
@@ -396,7 +440,7 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c
396
440
ip , port , slave , err = pick_node (self , serv_list , slot )
397
441
if err then
398
442
ngx .log (ngx .ERR , " pickup node failed, will return failed for this request, meanwhile refereshing slotcache " .. err )
399
- self :fetch_slots ()
443
+ self :refresh_slots ()
400
444
return nil , err
401
445
end
402
446
end
@@ -416,7 +460,7 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c
416
460
-- set readonly
417
461
ok , err = redis_client :readonly ()
418
462
if not ok then
419
- self :fetch_slots ()
463
+ self :refresh_slots ()
420
464
return nil , err
421
465
end
422
466
end
@@ -425,7 +469,7 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c
425
469
-- executing asking
426
470
ok , err = redis_client :asking ()
427
471
if not ok then
428
- self :fetch_slots ()
472
+ self :refresh_slots ()
429
473
return nil , err
430
474
end
431
475
end
@@ -445,7 +489,7 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c
445
489
release_connection (redis_client , config )
446
490
target_ip = nil
447
491
target_port = nil
448
- self :fetch_slots ()
492
+ self :refresh_slots ()
449
493
need_to_retry = true
450
494
451
495
elseif string.sub (err , 1 , 3 ) == " ASK" then
@@ -468,7 +512,7 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c
468
512
return nil , " Cannot executing command, cluster status is failed!"
469
513
else
470
514
-- There might be node fail, we should also refresh slot cache
471
- self :fetch_slots ()
515
+ self :refresh_slots ()
472
516
return nil , err
473
517
end
474
518
end
@@ -478,7 +522,7 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c
478
522
end
479
523
else
480
524
-- There might be node fail, we should also refresh slot cache
481
- self :fetch_slots ()
525
+ self :refresh_slots ()
482
526
if k == config .max_redirection or k == DEFAULT_MAX_REDIRECTION then
483
527
-- only return after allowing for `k` attempts
484
528
return nil , connerr
@@ -561,7 +605,7 @@ local function construct_final_pipeline_resp(self, node_res_map, node_req_map)
561
605
-- ngx.log(ngx.NOTICE, "handle moved signal for cmd:" .. reqs[i]["cmd"] .. " key:" .. reqs[i]["key"])
562
606
if need_to_fetch_slots then
563
607
-- if there is multiple signal for moved, we just need to fetch slot cache once, and do retry.
564
- self :fetch_slots ()
608
+ self :refresh_slots ()
565
609
need_to_fetch_slots = false
566
610
end
567
611
local movedres , err = handle_command_with_retry (self , nil , nil , false , reqs [i ][" cmd" ], reqs [i ][" key" ], unpack (reqs [i ][" args" ]))
@@ -634,7 +678,7 @@ function _M.commit_pipeline(self)
634
678
-- We must empty local reference to slots cache, otherwise there will be memory issue while
635
679
-- coroutine swich happens(eg. ngx.sleep, cosocket), very important!
636
680
slots = nil
637
- self :fetch_slots ()
681
+ self :refresh_slots ()
638
682
return nil , err
639
683
end
640
684
@@ -671,7 +715,7 @@ function _M.commit_pipeline(self)
671
715
-- set readonly
672
716
local ok , err = redis_client :readonly ()
673
717
if not ok then
674
- self :fetch_slots ()
718
+ self :refresh_slots ()
675
719
return nil , err
676
720
end
677
721
end
@@ -692,7 +736,7 @@ function _M.commit_pipeline(self)
692
736
local res , err = redis_client :commit_pipeline ()
693
737
if err then
694
738
-- There might be node fail, we should also refresh slot cache
695
- self :fetch_slots ()
739
+ self :refresh_slots ()
696
740
return nil , err .. " return from " .. tostring (ip ) .. " :" .. tostring (port )
697
741
end
698
742
@@ -703,7 +747,7 @@ function _M.commit_pipeline(self)
703
747
node_res_map [k ] = res
704
748
else
705
749
-- There might be node fail, we should also refresh slot cache
706
- self :fetch_slots ()
750
+ self :refresh_slots ()
707
751
return nil , err .. " pipeline commit failed while connecting to " .. tostring (ip ) .. " :" .. tostring (port )
708
752
end
709
753
end
0 commit comments