From 9fdc7c89507e1499c93b8b8e98870828d76efbfb Mon Sep 17 00:00:00 2001 From: Taishi Kasuga Date: Sun, 18 Feb 2024 15:06:33 +0900 Subject: [PATCH 1/4] fix: the watch command should be executed in advance out of the pipeline of the transaction --- lib/redis_client/cluster.rb | 20 ++++++-- lib/redis_client/cluster/node_key.rb | 4 ++ .../cluster/optimistic_locking.rb | 48 +++++++++++++++++++ lib/redis_client/cluster/transaction.rb | 44 +++++------------ 4 files changed, 79 insertions(+), 37 deletions(-) create mode 100644 lib/redis_client/cluster/optimistic_locking.rb diff --git a/lib/redis_client/cluster.rb b/lib/redis_client/cluster.rb index cf6b0837..d342fdd4 100644 --- a/lib/redis_client/cluster.rb +++ b/lib/redis_client/cluster.rb @@ -6,6 +6,7 @@ require 'redis_client/cluster/router' require 'redis_client/cluster/transaction' require 'redis_client/cluster/pinning_node' +require 'redis_client/cluster/optimistic_locking' class RedisClient class Cluster @@ -91,9 +92,18 @@ def pipelined end def multi(watch: nil) - transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder, watch) - yield transaction - transaction.execute + if watch.nil? || watch.empty? + transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder) + yield transaction + transaction.execute + else + locking = ::RedisClient::Cluster::OptimisticLocking.new(watch, @router) + locking.watch do |c| + transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder, c) + yield transaction + transaction.execute + end + end end def pubsub @@ -102,11 +112,11 @@ def pubsub # TODO: This isn't an official public interface yet. Don't use in your production environment. # @see https://github.com/redis-rb/redis-cluster-client/issues/299 - def with(key: nil, hashtag: nil, write: true, _retry_count: 0, &_) + def with(key: nil, hashtag: nil, write: true) key = process_with_arguments(key, hashtag) node_key = @router.find_node_key_by_key(key, primary: write) node = @router.find_node(node_key) - yield ::RedisClient::Cluster::PinningNode.new(node) + node.with { |c| yield ::RedisClient::Cluster::PinningNode.new(c) } end def close diff --git a/lib/redis_client/cluster/node_key.rb b/lib/redis_client/cluster/node_key.rb index 946dcc93..c62ad80f 100644 --- a/lib/redis_client/cluster/node_key.rb +++ b/lib/redis_client/cluster/node_key.rb @@ -31,6 +31,10 @@ def build_from_uri(uri) def build_from_host_port(host, port) "#{host}#{DELIMITER}#{port}" end + + def build_from_client(client) + "#{client.config.host}#{DELIMITER}#{client.config.port}" + end end end end diff --git a/lib/redis_client/cluster/optimistic_locking.rb b/lib/redis_client/cluster/optimistic_locking.rb new file mode 100644 index 00000000..35f50ebc --- /dev/null +++ b/lib/redis_client/cluster/optimistic_locking.rb @@ -0,0 +1,48 @@ +# frozen_string_literal: true + +require 'redis_client' +require 'redis_client/cluster/key_slot_converter' +require 'redis_client/cluster/transaction' + +class RedisClient + class Cluster + class OptimisticLocking + def initialize(keys, router) + @node = find_node!(keys, router) + @keys = keys + end + + def watch + @node.with do |c| + c.call('WATCH', *@keys) + reply = yield(c) + c.call('UNWATCH') + reply + end + end + + private + + def find_node!(keys, router) + raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" unless safe?(keys) + + node_key = router.find_primary_node_key(['WATCH', *keys]) + raise ::RedisClient::Cluster::Transaction::ConsistencyError, "couldn't determine the node" if node_key.nil? + + router.find_node(node_key) + end + + def safe?(keys) + return false if keys.empty? + + slots = keys.map do |k| + return false if k.nil? || k.empty? + + ::RedisClient::Cluster::KeySlotConverter.convert(k) + end + + slots.uniq.size == 1 + end + end + end +end diff --git a/lib/redis_client/cluster/transaction.rb b/lib/redis_client/cluster/transaction.rb index 5b3eab9e..96bca7ea 100644 --- a/lib/redis_client/cluster/transaction.rb +++ b/lib/redis_client/cluster/transaction.rb @@ -2,21 +2,21 @@ require 'redis_client' require 'redis_client/cluster/pipeline' -require 'redis_client/cluster/key_slot_converter' +require 'redis_client/cluster/node_key' class RedisClient class Cluster class Transaction ConsistencyError = Class.new(::RedisClient::Error) - def initialize(router, command_builder, watch) + def initialize(router, command_builder, node = nil) @router = router @command_builder = command_builder - @watch = watch @retryable = true @pipeline = ::RedisClient::Pipeline.new(@command_builder) @pending_commands = [] - @node = nil + @node = node + prepare_tx unless @node.nil? end def call(*command, **kwargs, &block) @@ -62,7 +62,6 @@ def execute raise ArgumentError, 'empty transaction' if @pipeline._empty? raise ConsistencyError, "couldn't determine the node: #{@pipeline._commands}" if @node.nil? - raise ConsistencyError, "unsafe watch: #{@watch.join(' ')}" unless safe_watch? settle end @@ -74,25 +73,6 @@ def defer(&block) nil end - def watch? - !@watch.nil? && !@watch.empty? - end - - def safe_watch? - return true unless watch? - return false if @node.nil? - - slots = @watch.map do |k| - return false if k.nil? || k.empty? - - ::RedisClient::Cluster::KeySlotConverter.convert(k) - end - - return false if slots.uniq.size != 1 - - @router.find_primary_node_by_slot(slots.first) == @node - end - def prepare(command) return true unless @node.nil? @@ -100,16 +80,18 @@ def prepare(command) return false if node_key.nil? @node = @router.find_node(node_key) - @pipeline.call('WATCH', *@watch) if watch? + prepare_tx + true + end + + def prepare_tx @pipeline.call('MULTI') @pending_commands.each(&:call) @pending_commands.clear - true end def settle @pipeline.call('EXEC') - @pipeline.call('UNWATCH') if watch? send_transaction(@node, redirect: true) end @@ -133,11 +115,10 @@ def send_pipeline(client, redirect:) end end - offset = watch? ? 2 : 1 - coerce_results!(replies[-offset], offset) + coerce_results!(replies.last) end - def coerce_results!(results, offset) + def coerce_results!(results, offset: 1) results.each_with_index do |result, index| if result.is_a?(::RedisClient::CommandError) result._set_command(@pipeline._commands[index + offset]) @@ -171,8 +152,7 @@ def ensure_the_same_node!(commands) node_key = @router.find_primary_node_key(command) next if node_key.nil? - node = @router.find_node(node_key) - next if @node == node + next if NodeKey.build_from_client(@node) == node_key raise ConsistencyError, "the transaction should be executed to a slot in a node: #{commands}" end From 322a68d8dbe42effb50c9a025c0d6612a5904eec Mon Sep 17 00:00:00 2001 From: Taishi Kasuga Date: Sun, 18 Feb 2024 15:29:25 +0900 Subject: [PATCH 2/4] add some test cases --- lib/redis_client/cluster/transaction.rb | 2 ++ test/redis_client/cluster/test_node_key.rb | 14 ++++++++++++++ test/redis_client/test_cluster.rb | 21 +++++++++++++++++++++ 3 files changed, 37 insertions(+) diff --git a/lib/redis_client/cluster/transaction.rb b/lib/redis_client/cluster/transaction.rb index 96bca7ea..546407b5 100644 --- a/lib/redis_client/cluster/transaction.rb +++ b/lib/redis_client/cluster/transaction.rb @@ -115,6 +115,8 @@ def send_pipeline(client, redirect:) end end + return if replies.last.nil? + coerce_results!(replies.last) end diff --git a/test/redis_client/cluster/test_node_key.rb b/test/redis_client/cluster/test_node_key.rb index 0ec6ca9e..c6b8e6e5 100644 --- a/test/redis_client/cluster/test_node_key.rb +++ b/test/redis_client/cluster/test_node_key.rb @@ -51,6 +51,20 @@ def test_build_from_host_port assert_equal(c[:want], got, "Case: #{idx}") end end + + def test_build_from_client + dummy_client = Struct.new(:config, keyword_init: true) + dummy_config = Struct.new(:host, :port, keyword_init: true) + dummy = dummy_client.new(config: dummy_config.new(host: '127.0.0.1', port: '6379')) + + [ + { client: dummy, want: '127.0.0.1:6379' }, + { client: ::RedisClient.new(host: '127.0.0.1', port: '6379'), want: '127.0.0.1:6379' } + ].each_with_index do |c, idx| + got = ::RedisClient::Cluster::NodeKey.build_from_client(c[:client]) + assert_equal(c[:want], got, "Case: #{idx}") + end + end end end end diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index 0dc651af..23721176 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -341,6 +341,27 @@ def test_transaction_with_block assert_equal(%w[a11 b22 c33], got) end + def test_transaction_in_race_condition + @client.call('MSET', '{key}1', '1', '{key}2', '2') + + another = Fiber.new do + cli = new_test_client + cli.call('MSET', '{key}1', '3', '{key}2', '4') + cli.close + Fiber.yield + end + + @client.multi(watch: %w[{key}1 {key}2]) do |tx| + another.resume + v1 = @client.call('GET', '{key}1') + v2 = @client.call('GET', '{key}1') + tx.call('SET', '{key}1', v2) + tx.call('SET', '{key}2', v1) + end + + assert_equal(%w[3 4], @client.call('MGET', '{key}1', '{key}2')) + end + def test_pubsub_without_subscription pubsub = @client.pubsub assert_nil(pubsub.next_event(0.01)) From 49e5560fe776abc9f5c81cc81d0760cc12b59b20 Mon Sep 17 00:00:00 2001 From: Taishi Kasuga Date: Sun, 18 Feb 2024 15:36:13 +0900 Subject: [PATCH 3/4] fix --- lib/redis_client/cluster/transaction.rb | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/redis_client/cluster/transaction.rb b/lib/redis_client/cluster/transaction.rb index 546407b5..17a3448f 100644 --- a/lib/redis_client/cluster/transaction.rb +++ b/lib/redis_client/cluster/transaction.rb @@ -150,11 +150,12 @@ def handle_command_error!(commands, err) end def ensure_the_same_node!(commands) + expected_node_key = NodeKey.build_from_client(@node) + commands.each do |command| node_key = @router.find_primary_node_key(command) next if node_key.nil? - - next if NodeKey.build_from_client(@node) == node_key + next if node_key == expected_node_key raise ConsistencyError, "the transaction should be executed to a slot in a node: #{commands}" end From e5308773b7012f66e347e4e33150179a5f70209d Mon Sep 17 00:00:00 2001 From: Taishi Kasuga Date: Sun, 18 Feb 2024 15:53:40 +0900 Subject: [PATCH 4/4] fix --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 0b91e5ee..a0ecfee3 100644 --- a/README.md +++ b/README.md @@ -189,7 +189,7 @@ then it is guaranted to hash to the same slot (and thus always live on the same So, whilst it's not possible in Redis cluster to perform a transction on the keys `foo` and `bar`, it _is_ possible to perform a transaction on the keys `{tag}foo` and `{tag}bar`. -To perform such transactions on this gem, use `hashtag: +To perform such transactions on this gem, use the hashtag: ```ruby cli.multi do |tx|