diff --git a/lib/moped/connection.rb b/lib/moped/connection.rb index 08aa576..e20a8d8 100644 --- a/lib/moped/connection.rb +++ b/lib/moped/connection.rb @@ -191,12 +191,23 @@ def write(operations) # # @since 1.2.9 def read_data(socket, length) - data = socket.read(length) - unless data - raise Errors::ConnectionFailure.new( - "Attempted to read #{length} bytes from the socket but nothing was returned." - ) + # Block on data to read for op_timeout seconds + # using the suggested implementation of http://www.ruby-doc.org/core-2.1.3/Kernel.html#method-i-select + # to work with SSL connections + time_left = op_timeout = @options[:op_timeout] || timeout + begin + raise Errors::OperationTimeout.new("Took more than #{op_timeout} seconds to receive data.") if (time_left -= 0.1) <= 0 + data = socket.read_nonblock(length) + rescue IO::WaitReadable + Kernel::select([socket], nil, [socket], 0.1) + retry + rescue IO::WaitWritable + Kernel::select(nil, [socket], [socket], 0.1) + retry + rescue SystemCallError, IOError => e + raise Errors::ConnectionFailure.new("Attempted to read #{length} bytes from the socket but an error happend #{e.message}.") end + if data.length < length data << read_data(socket, length - data.length) end diff --git a/lib/moped/errors.rb b/lib/moped/errors.rb index 183df14..1a87ffb 100644 --- a/lib/moped/errors.rb +++ b/lib/moped/errors.rb @@ -20,6 +20,9 @@ class PoolTimeout < RuntimeError; end # Generic error class for exceptions related to connection failures. class ConnectionFailure < StandardError; end + # Generic error class for exceptions related to read timeout failures. + class OperationTimeout < StandardError; end + # Raised when a database name is invalid. class InvalidDatabaseName < StandardError; end diff --git a/lib/moped/failover.rb b/lib/moped/failover.rb index d893adc..37d2ce8 100644 --- a/lib/moped/failover.rb +++ b/lib/moped/failover.rb @@ -21,7 +21,8 @@ module Failover Errors::ConnectionFailure => Retry, Errors::CursorNotFound => Ignore, Errors::OperationFailure => Reconfigure, - Errors::QueryFailure => Reconfigure + Errors::QueryFailure => Reconfigure, + Errors::PoolTimeout => Retry }.freeze # Get the appropriate failover handler given the provided exception. diff --git a/lib/moped/failover/retry.rb b/lib/moped/failover/retry.rb index bb8b091..6de08e1 100644 --- a/lib/moped/failover/retry.rb +++ b/lib/moped/failover/retry.rb @@ -9,7 +9,7 @@ module Failover module Retry extend self - # Executes the failover strategy. In the case of retyr, we disconnect and + # Executes the failover strategy. In the case of retry, we disconnect and # reconnect, then try the operation one more time. # # @example Execute the retry strategy. @@ -24,11 +24,13 @@ module Retry # # @since 2.0.0 def execute(exception, node) - node.disconnect + node.disconnect unless exception.is_a?(Errors::PoolTimeout) begin node.connection do |conn| yield(conn) if block_given? end + rescue Errors::PoolTimeout => e + raise Errors::ConnectionFailure.new e rescue Exception => e node.down! raise(e) diff --git a/lib/moped/node.rb b/lib/moped/node.rb index 0cafd4e..e8f0f3a 100644 --- a/lib/moped/node.rb +++ b/lib/moped/node.rb @@ -111,8 +111,14 @@ def connected? # # @since 2.0.0 def connection - pool.with do |conn| - yield(conn) + connection_acquired = false + begin + pool.with do |conn| + connection_acquired = true + yield(conn) + end + rescue Timeout::Error => e + raise connection_acquired ? e : Errors::PoolTimeout.new(e) end end @@ -156,6 +162,12 @@ def down! Connection::Manager.shutdown(self) end + def flush_connection_credentials + connection do |conn| + conn.credentials.clear + end + end + # Yields the block if a connection can be established, retrying when a # connection error is raised. # diff --git a/lib/moped/retryable.rb b/lib/moped/retryable.rb index 906b094..251adc7 100644 --- a/lib/moped/retryable.rb +++ b/lib/moped/retryable.rb @@ -29,12 +29,13 @@ def with_retry(cluster, retries = cluster.max_retries, &block) begin block.call rescue Errors::ConnectionFailure, Errors::PotentialReconfiguration => e - raise e if e.is_a?(Errors::PotentialReconfiguration) && - ! (e.message.include?("not master") || e.message.include?("Not primary")) + authentication_error = e.is_a?(Errors::PotentialReconfiguration) && e.message.match(/not (master|primary|authorized)/i) + raise e if e.is_a?(Errors::PotentialReconfiguration) && !authentication_error if retries > 0 Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s).", "n/a") sleep(cluster.retry_interval) + cluster.nodes.each { |node| node.flush_connection_credentials } if authentication_error cluster.refresh with_retry(cluster, retries - 1, &block) else diff --git a/lib/moped/session.rb b/lib/moped/session.rb index 54203fa..33cfd73 100644 --- a/lib/moped/session.rb +++ b/lib/moped/session.rb @@ -240,6 +240,11 @@ def logout # @since 2.0.0 option(:timeout).allow(Optionable.any(Numeric)) + # Setup validation of allowed timeout options. (Any numeric) + # + # @since 2.0.0 + option(:op_timeout).allow(Optionable.any(Numeric)) + # Pass an object that responds to instrument as an instrumenter. # # @since 2.0.0 diff --git a/spec/moped/node_spec.rb b/spec/moped/node_spec.rb index aa88688..1013c2f 100644 --- a/spec/moped/node_spec.rb +++ b/spec/moped/node_spec.rb @@ -486,4 +486,47 @@ end end end + + describe "#connection" do + let(:node) do + described_class.new("127.0.0.1:27017", pool_size: 1, pool_timeout: 0.1) + end + + context "when take a long time to get a connection from pool" do + it "raise a Errors::PoolTimeout error" do + expect { + + exception = nil + 100.times.map do |i| + Thread.new do + begin + node.connection do |conn| + conn.apply_credentials({}) + node.update("test", "test_collection", { name: "test_counter" }, {'$inc' => {'cnt' => 1}}, Moped::WriteConcern.get({ w: 1 }), flags: {safe: true, upsert: true}) + end + rescue => e + exception = e if exception.nil? + end + end + end.each {|t| t.join } + raise exception unless exception.nil? + + }.to raise_error(Moped::Errors::PoolTimeout) + end + end + + context "when the timeout happens after get a connection from pool" do + it "raise a Timeout::Error" do + expect { + node.connection do |conn| + Timeout::timeout(0.01) do + conn.apply_credentials({}) + node.update("test", "test_collection", { name: "test_counter" }, {'$inc' => {'cnt' => 1}}, Moped::WriteConcern.get({ w: 1 }), flags: {safe: true, upsert: true}) + sleep(0.1) # just to simulate a long block which raise a timeout + end + end + }.to raise_error(Timeout::Error) + end + end + end end diff --git a/spec/moped/query_spec.rb b/spec/moped/query_spec.rb index 534259c..f94b75f 100644 --- a/spec/moped/query_spec.rb +++ b/spec/moped/query_spec.rb @@ -1018,6 +1018,42 @@ end end + context "with test commands enabled" do + + let(:session) do + Moped::Session.new([ "127.0.0.1:#{port}" ], database: "moped_test") + end + + let(:users) do + session.with(safe: true)[:users] + end + + describe "when a query take too long" do + let(:port) { 31104 } + + before do + start_mongo_server(port, "--setParameter enableTestCommands=1") + Process.detach(spawn("echo 'db.adminCommand({sleep: 1, w: true, secs: 10})' | mongo localhost:#{port} 2>&1 > /dev/null")) + sleep 1 # to sleep command on mongodb begins work + end + + after do + stop_mongo_server(port) + end + + it "raises a operation timeout exception" do + time = Benchmark.realtime do + expect { + Timeout::timeout(7) do + users.find("age" => { "$gte" => 65 }).first + end + }.to raise_exception("Took more than 5 seconds to receive data.") + end + expect(time).to be < 5.5 + end + end + end + context "with a remote connection", mongohq: :auth do before(:all) do diff --git a/spec/moped/session_spec.rb b/spec/moped/session_spec.rb index 6d4005f..4706171 100644 --- a/spec/moped/session_spec.rb +++ b/spec/moped/session_spec.rb @@ -347,4 +347,24 @@ nodes.last.should be_down end end + + context "when connections on pool are busy" do + let(:session) do + Moped::Session.new([ "127.0.0.1:27017" ], database: "moped_test", pool_size: 1, pool_timeout: 0.2, max_retries: 30, retry_interval: 1) + end + + it "should retry the operation" do + session[:test].find({ name: "test_counter" }).update({'$set' => {'cnt' => 1}}, {upsert: true}) + + results = [] + + 300.times.map do |i| + Thread.new do + results.push session[:test].find({ name: "test_counter" }).first["cnt"] + end + end.each {|t| t.join } + + expect(results.count).to eql(300) + end + end end