From a273251a5da75c8ea7df3675db9e870be909fc58 Mon Sep 17 00:00:00 2001 From: Taishi Kasuga Date: Tue, 20 Feb 2024 13:50:59 +0900 Subject: [PATCH 1/2] [WIP] fix: concurrency for the transaction --- test/test_concurrency.rb | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/test/test_concurrency.rb b/test/test_concurrency.rb index 4efbe6a9..7cddb1ba 100644 --- a/test/test_concurrency.rb +++ b/test/test_concurrency.rb @@ -54,6 +54,27 @@ def test_forking_with_pipelining MAX_THREADS.times { |i| assert_equal(WANT, @client.call('GET', "key#{i}")) } end + def test_forking_with_transaction + skip("fork is not available on #{RUBY_ENGINE}") if %w[jruby truffleruby].include?(RUBY_ENGINE) + + @client.call('SET', '{key}1', WANT) + + pids = Array.new(MAX_THREADS) do + Process.fork do + @client.multi(watch: %w[{key}1]) { |tx| ATTEMPTS.times { MAX_THREADS.times { tx.call('INCR', '{key}1') } } } + sleep 0.1 + @client.multi(watch: %w[{key}1]) { |tx| ATTEMPTS.times { MAX_THREADS.times { tx.call('DECR', '{key}1') } } } + end + end + + pids.each do |pid| + _, status = Process.waitpid2(pid) + assert_predicate(status, :success?) + end + + assert_equal(WANT, @client.call('GET', '{key}1')) + end + def test_threading threads = Array.new(MAX_THREADS) do Thread.new do @@ -84,6 +105,23 @@ def test_threading_with_pipelining MAX_THREADS.times { |i| assert_equal(WANT, @client.call('GET', "key#{i}")) } end + def test_threading_with_transaction + @client.call('SET', '{key}1', WANT) + + threads = Array.new(MAX_THREADS) do + Thread.new do + @client.multi(watch: %w[{key}1]) { |tx| ATTEMPTS.times { MAX_THREADS.times { tx.call('INCR', '{key}1') } } } + @client.multi(watch: %w[{key}1]) { |tx| ATTEMPTS.times { MAX_THREADS.times { tx.call('DECR', '{key}1') } } } + nil + rescue StandardError => e + e + end + end + + threads.each { |t| assert_nil(t.value) } + assert_equal(WANT, @client.call('GET', '{key}1')) + end + private def new_test_client From 6932d94eb9e4e79a57230d2e7167c07ffe010293 Mon Sep 17 00:00:00 2001 From: Taishi Kasuga Date: Tue, 20 Feb 2024 14:05:26 +0900 Subject: [PATCH 2/2] fix --- test/test_concurrency.rb | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/test/test_concurrency.rb b/test/test_concurrency.rb index 7cddb1ba..0f816cf1 100644 --- a/test/test_concurrency.rb +++ b/test/test_concurrency.rb @@ -61,9 +61,14 @@ def test_forking_with_transaction pids = Array.new(MAX_THREADS) do Process.fork do - @client.multi(watch: %w[{key}1]) { |tx| ATTEMPTS.times { MAX_THREADS.times { tx.call('INCR', '{key}1') } } } - sleep 0.1 - @client.multi(watch: %w[{key}1]) { |tx| ATTEMPTS.times { MAX_THREADS.times { tx.call('DECR', '{key}1') } } } + @client.multi(watch: %w[{key}1]) do |tx| + ATTEMPTS.times do + MAX_THREADS.times do + tx.call('INCR', '{key}1') + tx.call('DECR', '{key}1') + end + end + end end end @@ -110,15 +115,20 @@ def test_threading_with_transaction threads = Array.new(MAX_THREADS) do Thread.new do - @client.multi(watch: %w[{key}1]) { |tx| ATTEMPTS.times { MAX_THREADS.times { tx.call('INCR', '{key}1') } } } - @client.multi(watch: %w[{key}1]) { |tx| ATTEMPTS.times { MAX_THREADS.times { tx.call('DECR', '{key}1') } } } - nil + @client.multi(watch: %w[{key}1]) do |tx| + ATTEMPTS.times do + MAX_THREADS.times do + tx.call('INCR', '{key}1') + tx.call('DECR', '{key}1') + end + end + end rescue StandardError => e e end end - threads.each { |t| assert_nil(t.value) } + threads.each { |t| refute_instance_of(StandardError, t.value) } assert_equal(WANT, @client.call('GET', '{key}1')) end