Skip to content

Commit aa408ff

Browse files
authored
test: add stream command cases (#418)
1 parent 3c7aa6e commit aa408ff

File tree

2 files changed

+56
-5
lines changed

2 files changed

+56
-5
lines changed

lib/redis_client/cluster/command.rb

+15-5
Original file line numberDiff line numberDiff line change
@@ -46,21 +46,28 @@ def load(nodes, slow_command_timeout: -1) # rubocop:disable Metrics/AbcSize
4646

4747
private
4848

49-
def parse_command_reply(rows) # rubocop:disable Metrics/CyclomaticComplexity
49+
def parse_command_reply(rows) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity
5050
rows&.each_with_object({}) do |row, acc|
5151
next if row.first.nil?
5252

53+
# TODO: in redis 7.0 or later, subcommand information included in the command reply
54+
5355
pos = case row.first
5456
when 'eval', 'evalsha', 'zinterstore', 'zunionstore' then 3
55-
when 'object' then 2
57+
when 'object', 'xgroup' then 2
5658
when 'migrate', 'xread', 'xreadgroup' then 0
5759
else row[3]
5860
end
5961

62+
writable = case row.first
63+
when 'xgroup' then true
64+
else row[2].include?('write')
65+
end
66+
6067
acc[row.first] = ::RedisClient::Cluster::Command::Detail.new(
6168
first_key_position: pos,
6269
key_step: row[5],
63-
write?: row[2].include?('write'),
70+
write?: writable,
6471
readonly?: row[2].include?('readonly')
6572
)
6673
end.freeze || EMPTY_HASH
@@ -115,8 +122,11 @@ def determine_first_key_position(command) # rubocop:disable Metrics/CyclomaticCo
115122
end
116123

117124
def determine_optional_key_position(command, option_name)
118-
idx = command.map { |e| e.to_s.downcase(:ascii) }.index(option_name)
119-
idx.nil? ? 0 : idx + 1
125+
command.each_with_index do |e, i|
126+
return i + 1 if e.to_s.downcase(:ascii) == option_name
127+
end
128+
129+
0
120130
end
121131
end
122132
end

test/redis_client/test_cluster.rb

+41
Original file line numberDiff line numberDiff line change
@@ -711,6 +711,47 @@ def test_other_pubsub_commands
711711
ps.close
712712
end
713713

714+
def test_stream_commands
715+
@client.call('xadd', '{stream}1', '*', 'mesage', 'foo')
716+
@client.call('xadd', '{stream}1', '*', 'mesage', 'bar')
717+
@client.call('xadd', '{stream}2', '*', 'mesage', 'baz')
718+
@client.call('xadd', '{stream}2', '*', 'mesage', 'zap')
719+
wait_for_replication
720+
721+
consumer = new_test_client
722+
got = consumer.call('xread', 'streams', '{stream}1', '{stream}2', '0', '0')
723+
consumer.close
724+
725+
got = got.to_h if TEST_REDIS_MAJOR_VERSION < 6
726+
727+
assert_equal('foo', got.fetch('{stream}1')[0][1][1])
728+
assert_equal('bar', got.fetch('{stream}1')[1][1][1])
729+
assert_equal('baz', got.fetch('{stream}2')[0][1][1])
730+
assert_equal('zap', got.fetch('{stream}2')[1][1][1])
731+
end
732+
733+
def test_stream_group_commands
734+
@client.call('xadd', '{stream}1', '*', 'task', 'data1')
735+
@client.call('xadd', '{stream}1', '*', 'task', 'data2')
736+
@client.call('xgroup', 'create', '{stream}1', 'worker', '0')
737+
wait_for_replication
738+
739+
consumer1 = new_test_client
740+
consumer2 = new_test_client
741+
got1 = consumer1.call('xreadgroup', 'group', 'worker', 'consumer1', 'count', '1', 'streams', '{stream}1', '>')
742+
got2 = consumer2.call('xreadgroup', 'group', 'worker', 'consumer2', 'count', '1', 'streams', '{stream}1', '>')
743+
consumer1.close
744+
consumer2.close
745+
746+
if TEST_REDIS_MAJOR_VERSION < 6
747+
got1 = got1.to_h
748+
got2 = got2.to_h
749+
end
750+
751+
assert_equal('data1', got1.fetch('{stream}1')[0][1][1])
752+
assert_equal('data2', got2.fetch('{stream}1')[0][1][1])
753+
end
754+
714755
def test_with_method
715756
assert_raises(NotImplementedError) { @client.with }
716757
end

0 commit comments

Comments
 (0)