diff --git a/README.md b/README.md index 13948010..0a62adbb 100644 --- a/README.md +++ b/README.md @@ -172,87 +172,101 @@ cli.call('MGET', '{key}1', '{key}2', '{key}3') ## Transactions This gem supports [Redis transactions](https://redis.io/topics/transactions), including atomicity with `MULTI`/`EXEC`, and conditional execution with `WATCH`. Redis does not support cross-node transactions, so all keys used within a -transaction must live in the same key slot. To use transactions, you must thus "pin" your client to a single connection using -`#with`. You can pass a single key, in order to perform multiple operations atomically on the same key, like so: +transaction must live in the same key slot. To use transactions, you can use `#multi` method same as the [redis-client](https://github.com/redis-rb/redis-client#usage): ```ruby -cli.with(key: 'my_cool_key') do |conn| - conn.multi do |m| - m.call('INC', 'my_cool_key') - m.call('INC', 'my_cool_key') - end - # my_cool_key will be incremented by 2, with no intermediate state visible to other clients +conn.multi do |tx| + tx.call('INCR', 'my_key') + tx.call('INCR', 'my_key') end ``` -More commonly, however, you will want to perform transactions across multiple keys. To do this, you need to ensure that all keys used in the transaction hash to the same slot; Redis a mechanism called [hashtags](https://redis.io/docs/reference/cluster-spec/#hash-tags) to achieve this. If a key contains a hashag (e.g. in the key `{foo}bar`, the hashtag is `foo`), then it is guaranted to hash to the same slot (and thus always live on the same node) as other keys which contain the same hashtag. +More commonly, however, you will want to perform transactions across multiple keys. To do this, +you need to ensure that all keys used in the transaction hash to the same slot; +Redis a mechanism called [hashtags](https://redis.io/docs/reference/cluster-spec/#hash-tags) to achieve this. +If a key contains a hashag (e.g. in the key `{foo}bar`, the hashtag is `foo`), +then it is guaranted to hash to the same slot (and thus always live on the same node) as other keys which contain the same hashtag. -So, whilst it's not possible in Redis cluster to perform a transction on the keys `foo` and `bar`, it _is_ possible to perform a transaction on the keys `{tag}foo` and `{tag}bar`. To perform such transactions on this gem, pass `hashtag:` to `#with` instead of `key`: +So, whilst it's not possible in Redis cluster to perform a transction on the keys `foo` and `bar`, +it _is_ possible to perform a transaction on the keys `{tag}foo` and `{tag}bar`. +To perform such transactions on this gem, use `hashtag: ```ruby -cli.with(hashtag: 'user123') do |conn| - # You can use any key which contains "{user123}" in this block - conn.multi do |m| - m.call('INC', '{user123}coins_spent') - m.call('DEC', '{user123}coins_available') - end +conn.multi do |tx| + tx.call('INCR', '{user123}coins_spent') + tx.call('DECR', '{user123}coins_available') end ``` -Once you have pinned a client to a particular slot, you can use the same transaction APIs as the -[redis-client](https://github.com/redis-rb/redis-client#usage) gem allows. ```ruby -# No concurrent client will ever see the value 1 in 'mykey'; it will see either zero or two. -cli.call('SET', 'key', 0) -cli.with(key: 'key') do |conn| - conn.multi do |txn| - txn.call('INCR', 'key') - txn.call('INCR', 'key') - end - #=> ['OK', 'OK'] -end # Conditional execution with WATCH can be used to e.g. atomically swap two keys cli.call('MSET', '{myslot}1', 'v1', '{myslot}2', 'v2') -cli.with(hashtag: 'myslot') do |conn| - conn.call('WATCH', '{myslot}1', '{myslot}2') - conn.multi do |txn| - old_key1 = conn.call('GET', '{myslot}1') - old_key2 = conn.call('GET', '{myslot}2') - txn.call('SET', '{myslot}1', old_key2) - txn.call('SET', '{myslot}2', old_key1) - end - # This transaction will swap the values of {myslot}1 and {myslot}2 only if no concurrent connection modified - # either of the values -end -# You can also pass watch: to #multi as a shortcut -cli.call('MSET', '{myslot}1', 'v1', '{myslot}2', 'v2') -cli.with(hashtag: 'myslot') do |conn| - conn.multi(watch: ['{myslot}1', '{myslot}2']) do |txn| - old_key1, old_key2 = conn.call('MGET', '{myslot}1', '{myslot}2') - txn.call('MSET', '{myslot}1', old_key2, '{myslot}2', old_key1) - end +conn.multi(watch: %w[{myslot}1 {myslot}2]) do |txn| + old_key1 = cli.call('GET', '{myslot}1') + old_key2 = cli.call('GET', '{myslot}2') + txn.call('SET', '{myslot}1', old_key2) + txn.call('SET', '{myslot}2', old_key1) end +# This transaction will swap the values of {myslot}1 and {myslot}2 only if no concurrent connection modified +# either of the values ``` -Pinned connections are aware of redirections and node failures like ordinary calls to `RedisClient::Cluster`, but because -you may have written non-idempotent code inside your block, the block is not automatically retried if e.g. the slot -it is operating on moves to a different node. If you want this, you can opt-in to retries by passing nonzero -`retry_count` to `#with`. -```ruby -cli.with(hashtag: 'myslot', retry_count: 1) do |conn| - conn.call('GET', '{myslot}1') - #=> "value1" - # Now, some changes in cluster topology mean that {key} is moved to a different node! - conn.call('GET', '{myslot}2') - #=> MOVED 9039 127.0.0.1:16381 (RedisClient::CommandError) - # Luckily, the block will get retried (once) and so both GETs will be re-executed on the newly-discovered - # correct node. -end +`RedisClient::Cluster#multi` is aware of redirections and node failures like ordinary calls to `RedisClient::Cluster`, +but because you may have written non-idempotent code inside your block, the block is called once if e.g. the slot +it is operating on moves to a different node. + +#### IMO + +https://redis.io/docs/interact/transactions/#errors-inside-a-transaction + +> Errors happening after EXEC instead are not handled in a special way: all the other commands will be executed even if some command fails during the transaction. +> It's important to note that even when a command fails, all the other commands in the queue are processed - Redis will not stop the processing of commands. + ``` +$ telnet 127.0.0.1 6379 +set key3 a ++OK +multi ++OK +set key3 b ++QUEUED +incr key3 ++QUEUED +exec +*2 ++OK +-ERR value is not an integer or out of range +get key3 +$1 +b +``` + +The `SET` command was processed because the `INCR` command was queued. + +``` +multi ++OK +set key3 c ++QUEUED +mybad key3 d +-ERR unknown command 'mybad', with args beginning with: 'key3' 'd' +exec +-EXECABORT Transaction discarded because of previous errors. +get key3 +$1 +b +``` + +The `SET` command wasn't processed because of the error during the queueing. + +https://redis.io/docs/interact/transactions/#what-about-rollbacks + +> Redis does not support rollbacks of transactions since supporting rollbacks would have a significant impact on the simplicity and performance of Redis. -Because `RedisClient` from the redis-client gem implements `#with` as simply `yield self` and ignores all of its -arguments, it's possible to write code which is compatible with both redis-client and redis-cluster-client; the `#with` -call will pin the connection to a slot when using clustering, or be a no-op when not. +It's hard to validate them perfectly in advance on the client side. +It seems that Redis aims to prior simplicity and performance efficiency. +So I think it's wrong to use the transaction feature by complex ways. +To say nothing of the cluster mode because of the CAP theorem. Redis is just a key-value store. ## ACL The cluster client internally calls [COMMAND](https://redis.io/commands/command/) and [CLUSTER NODES](https://redis.io/commands/cluster-nodes/) commands to operate correctly. diff --git a/lib/redis_client/cluster.rb b/lib/redis_client/cluster.rb index 4bc8d3ef..fc90331b 100644 --- a/lib/redis_client/cluster.rb +++ b/lib/redis_client/cluster.rb @@ -5,6 +5,7 @@ require 'redis_client/cluster/pub_sub' require 'redis_client/cluster/router' require 'redis_client/cluster/transaction' +require 'redis_client/cluster/pinning_node' class RedisClient class Cluster @@ -95,19 +96,18 @@ def multi(watch: nil) transaction.execute end - def with(key: nil, hashtag: nil, write: true, retry_count: 0, &block) + def pubsub + ::RedisClient::Cluster::PubSub.new(@router, @command_builder) + end + + # TODO: This isn't an official public interface yet. Don't use in your production environment. + # @see https://github.com/redis-rb/redis-cluster-client/issues/299 + def with(key: nil, hashtag: nil, write: true, _retry_count: 0, &_) key = process_with_arguments(key, hashtag) node_key = @router.find_node_key_by_key(key, primary: write) node = @router.find_node(node_key) - # Calling #with checks out the underlying connection if this is a pooled connection - # Calling it through #try_delegate ensures we handle any redirections and retry the entire - # transaction if so. - @router.try_delegate(node, :with, retry_count: retry_count, &block) - end - - def pubsub - ::RedisClient::Cluster::PubSub.new(@router, @command_builder) + yield ::RedisClient::Cluster::PinningNode.new(node) end def close diff --git a/lib/redis_client/cluster/pinning_node.rb b/lib/redis_client/cluster/pinning_node.rb new file mode 100644 index 00000000..8d925095 --- /dev/null +++ b/lib/redis_client/cluster/pinning_node.rb @@ -0,0 +1,35 @@ +# frozen_string_literal: true + +class RedisClient + class Cluster + class PinningNode + def initialize(client) + @client = client + end + + def call(*args, **kwargs, &block) + @client.call(*args, **kwargs, &block) + end + + def call_v(args, &block) + @client.call_v(args, &block) + end + + def call_once(*args, **kwargs, &block) + @client.call_once(*args, **kwargs, &block) + end + + def call_once_v(args, &block) + @client.call_once_v(args, &block) + end + + def blocking_call(timeout, *args, **kwargs, &block) + @client.blocking_call(timeout, *args, **kwargs, &block) + end + + def blocking_call_v(timeout, args, &block) + @client.blocking_call_v(timeout, args, &block) + end + end + end +end diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index d0c4af24..d081d37c 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -605,16 +605,6 @@ def test_pinning_two_keys assert_equal(%w[v1 v2], got) end - def test_pinning_cross_slot - skip 'This is not implemented yet!' - - assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do - @client.with(hashtag: 'slot1') do |conn| - conn.call('GET', '{slot2}') - end - end - end - def test_pinning_hashtag_with_braces got = @client.with(hashtag: '{slot}') do |conn| conn.call('SET', '{slot}key1', 'v1') @@ -624,174 +614,6 @@ def test_pinning_hashtag_with_braces assert_equal(%w[v1 v2], got) end - def test_pinning_pipeline - got = @client.with(hashtag: 'slot') do |conn| - conn.call_v(['SET', '{slot}counter', 0]) - conn.pipelined do |pipe| - pipe.call_v(['INCR', '{slot}counter']) - pipe.call_v(['INCR', '{slot}counter']) - pipe.call_v(['INCR', '{slot}counter']) - end - conn.call_v(['GET', '{slot}counter']).to_i - end - - assert_equal(3, got) - end - - def test_pinning_pipeline_with_error - assert_raises(RedisClient::CommandError) do - @client.with(hashtag: 'slot') do |conn| - conn.pipelined do |pipeline| - pipeline.call('SET', '{slot}key', 'first') - pipeline.call('SET', '{slot}key', 'second', 'too many args') - pipeline.call('SET', '{slot}key', 'third') - end - end - end - - wait_for_replication - assert_equal('third', @client.call('GET', '{slot}key')) - end - - def test_pinning_transaction - got = @client.with(hashtag: 'slot') do |conn| - conn.multi do |txn| - txn.call('SET', '{slot}key1', 'value1') - txn.call('SET', '{slot}key2', 'value2') - end - end - - assert_equal(%w[OK OK], got) - end - - def test_pinning_transaction_watch_arg - @client.call('MSET', '{slot}key1', 'val1', '{slot}key2', 'val2') - @captured_commands.clear - - got = @client.with(hashtag: 'slot') do |conn| - conn.multi(watch: ['{slot}key1', '{slot}key2']) do |txn| - old_key1, old_key2 = conn.call('MGET', '{slot}key1', '{slot}key2') - txn.call('MSET', '{slot}key1', old_key2, '{slot}key2', old_key1) - end - end - - assert_equal([ - %w[WATCH {slot}key1 {slot}key2], - %w[MGET {slot}key1 {slot}key2], - %w[MULTI], - %w[MSET {slot}key1 val2 {slot}key2 val1], - %w[EXEC] - ], @captured_commands.to_a.map(&:command)) - - wait_for_replication - assert_equal(['OK'], got) - assert_equal(%w[val2 val1], @client.call('MGET', '{slot}key1', '{slot}key2')) - end - - def test_pinning_transaction_watch_arg_unwatches_on_raise - ex = Class.new(StandardError) - @captured_commands.clear - - assert_raises(ex) do - @client.with(hashtag: 'slot') do |conn| - conn.multi(watch: ['{slot}key1']) do |_txn| - conn.call('GET', '{slot}key1') - raise ex, 'boom' - end - end - end - - assert_equal([ - %w[WATCH {slot}key1], - %w[GET {slot}key1], - %w[UNWATCH] - ], @captured_commands.to_a.map(&:command)) - end - - def test_pinning_transaction_can_watch_manually - @client.call('MSET', '{slot}key1', 'val1', '{slot}key2', 'val2') - @captured_commands.clear - - got = @client.with(hashtag: 'slot') do |conn| - conn.call('WATCH', '{slot}key1', '{slot}key2') - old_key1, old_key2 = conn.call('MGET', '{slot}key1', '{slot}key2') - conn.multi do |txn| - txn.call('MSET', '{slot}key1', old_key2, '{slot}key2', old_key1) - end - end - - assert_equal([ - %w[WATCH {slot}key1 {slot}key2], - %w[MGET {slot}key1 {slot}key2], - %w[MULTI], - %w[MSET {slot}key1 val2 {slot}key2 val1], - %w[EXEC] - ], @captured_commands.to_a.map(&:command)) - - wait_for_replication - assert_equal(['OK'], got) - assert_equal(%w[val2 val1], @client.call('MGET', '{slot}key1', '{slot}key2')) - end - - def test_pinning_transaction_can_unwatch_manually - got = @client.with(hashtag: 'slot') do |conn| - conn.call('WATCH', '{slot}key1') - conn.call('UNWATCH') - end - - assert_equal('OK', got) - end - - def test_pinning_timeouts_update_topology - # Create a new test client with a lower timeout for this test so it's fast. - captured_commands = CommandCaptureMiddleware::CommandBuffer.new - client = new_test_client(capture_buffer: captured_commands, timeout: 0.5) - - client.call('DEL', '{slot}list') - captured_commands.clear - - assert_raises(::RedisClient::ReadTimeoutError) do - client.with(hashtag: 'slot') do |conn| - conn.call_v(['BLPOP', '{slot}list', 0]) - end - end - assert_includes(captured_commands.to_a.map(&:command), %w[CLUSTER NODES]) - end - - def test_pinning_sscan - @client.call('DEL', '{slot}set') - expected_set = Set.new - scanned_set = Set.new - 1000.times do |i| - expected_set << i - @client.call('SADD', '{slot}set', i) - end - @client.with(hashtag: 'slot') do |conn| - conn.sscan('{slot}set') do |i| - scanned_set << i.to_i - end - end - - assert_equal(expected_set, scanned_set) - end - - def test_pinning_zscan - @client.call('DEL', '{slot}set') - expected_set = Set.new - scanned_set = Set.new - 1000.times do |i| - expected_set << "member#{i}" - @client.call('ZADD', '{slot}set', i, "member#{i}") - end - @client.with(hashtag: 'slot') do |conn| - conn.zscan('{slot}set') do |i| - scanned_set << i - end - end - - assert_equal(expected_set, scanned_set) - end - private def wait_for_replication