Skip to content
This repository was archived by the owner on Jan 15, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions lib/moped/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,8 @@ def retry_interval
# @since 1.0.0
def with_primary(&block)
if node = nodes.find(&:primary?)
begin
node.ensure_primary do
return yield(node)
end
rescue Errors::ConnectionFailure, Errors::ReplicaSetReconfigured
node.ensure_primary do
return yield(node)
end
end
raise Errors::ConnectionFailure, "Could not connect to a primary node for replica set #{inspect}"
Expand Down Expand Up @@ -275,6 +272,37 @@ def with_secondary(&block)
raise Errors::ConnectionFailure, "Could not connect to a secondary node for replica set #{inspect}"
end

# Execute the provided block on the cluster and retry if the execution
# fails.
#
# @example Execute with retry.
# cluster.with_retry do
# cluster.with_primary do |node|
# node.refresh
# end
# end
#
# @param [ Integer ] retries The number of times to retry.
#
# @return [ Object ] The result of the block.
#
# @since 2.0.0
def with_retry(retries = max_retries, &block)
begin
block.call
rescue StandardError => e
raise e unless Failover::STRATEGIES[e.class] == Failover::Retry
if retries > 0
Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s). Exception: #{ e.class.name }, #{ e.message }", "n/a")
sleep(retry_interval)
refresh
with_retry(retries - 1, &block)
else
raise e
end
end
end

private

# Apply the credentials on all nodes
Expand Down
6 changes: 4 additions & 2 deletions lib/moped/collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,10 @@ def initialize(database, name)
# @since 1.0.0
def insert(documents, flags = nil)
docs = documents.is_a?(Array) ? documents : [ documents ]
cluster.with_primary do |node|
node.insert(database.name, name, docs, write_concern, flags: flags || [])
cluster.with_retry do
cluster.with_primary do |node|
node.insert(database.name, name, docs, write_concern, flags: flags || [])
end
end
end

Expand Down
49 changes: 14 additions & 35 deletions lib/moped/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ class InvalidMongoURI < StandardError; end

# Raised when providing an invalid string from an object id.
class InvalidObjectId < StandardError

# Create the new error.
#
# @example Create the new error.
Expand Down Expand Up @@ -105,29 +104,22 @@ def error_message
end
end

# Classes of errors that should not disconnect connections.
class DoNotDisconnect < MongoError; end

# Classes of errors that could be caused by a replica set reconfiguration.
class PotentialReconfiguration < MongoError
# @api private
#
# Exception indicating that replica set was most likely reconfigured
class ReplicaSetReconfigured < MongoError; end

# Not master error codes.
NOT_MASTER = [ 13435, 13436, 10009 ]
# Exception raised when database responds with 'not master' error
class NotMaster < ReplicaSetReconfigured; end

# Error codes received around reconfiguration
CONNECTION_ERRORS_RECONFIGURATION = [ 15988, 10276, 11600, 9001, 13639, 10009 ]
# Exception raised when authentication fails.
class AuthenticationFailure < MongoError; end

# Replica set reconfigurations can be either in the form of an operation
# error with code 13435, or with an error message stating the server is
# not a master. (This encapsulates codes 10054, 10056, 10058)
def reconfiguring_replica_set?
err = details["err"] || details["errmsg"] || details["$err"] || ""
NOT_MASTER.include?(details["code"]) || err.include?("not master")
end
# Exception raised when authorization fails.
class AuthorizationFailure < MongoError; end

def connection_failure?
CONNECTION_ERRORS_RECONFIGURATION.include?(details["code"])
end
# Exception raised when operation fails
class OperationFailure < MongoError

# Is the error due to a namespace not being found?
#
Expand All @@ -154,29 +146,16 @@ def ns_not_exists?
end
end

# Exception raised when authentication fails.
class AuthenticationFailure < DoNotDisconnect; end

# Exception class for exceptions generated as a direct result of an
# operation, such as a failed insert or an invalid command.
class OperationFailure < PotentialReconfiguration; end

# Exception raised on invalid queries.
class QueryFailure < PotentialReconfiguration; end
class QueryFailure < MongoError; end

# Exception raised if the cursor could not be found.
class CursorNotFound < DoNotDisconnect
class CursorNotFound < MongoError
def initialize(operation, cursor_id)
super(operation, {"errmsg" => "cursor #{cursor_id} not found"})
end
end

# @api private
#
# Internal exception raised by Node#ensure_primary and captured by
# Cluster#with_primary.
class ReplicaSetReconfigured < DoNotDisconnect; end

# Tag applied to unhandled exceptions on a node.
module SocketError; end
end
Expand Down
8 changes: 5 additions & 3 deletions lib/moped/failover.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# encoding: utf-8
require "moped/failover/disconnect"
require "moped/failover/ignore"
require "moped/failover/reconfigure"
require "moped/failover/retry"

module Moped
Expand All @@ -18,10 +17,13 @@ module Failover
# @since 2.0.0
STRATEGIES = {
Errors::AuthenticationFailure => Ignore,
Errors::AuthorizationFailure => Retry,
Errors::ConnectionFailure => Retry,
Errors::CursorNotFound => Ignore,
Errors::OperationFailure => Reconfigure,
Errors::QueryFailure => Reconfigure
Errors::OperationFailure => Ignore,
Errors::QueryFailure => Ignore,
Errors::NotMaster => Retry,
Errors::ReplicaSetReconfigured => Retry,
}.freeze

# Get the appropriate failover handler given the provided exception.
Expand Down
36 changes: 0 additions & 36 deletions lib/moped/failover/reconfigure.rb

This file was deleted.

2 changes: 1 addition & 1 deletion lib/moped/failover/retry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Failover
module Retry
extend self

# Executes the failover strategy. In the case of retyr, we disconnect and
# Executes the failover strategy. In the case of retry, we disconnect and
# reconnect, then try the operation one more time.
#
# @example Execute the retry strategy.
Expand Down
26 changes: 13 additions & 13 deletions lib/moped/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,16 @@ def down!
#
# @since 1.0.0
def ensure_connected(&block)
unless (conn = stack(:connection)).empty?
return yield(conn.first)
end

begin
connection do |conn|
stack(:connection) << conn
connect(conn) unless conn.connected?
conn.apply_credentials(@credentials)
yield(conn)
if (conn = stack(:connection)).length > 0
yield(conn.first)
else
connection do |conn|
stack(:connection) << conn
connect(conn) unless conn.connected?
conn.apply_credentials(@credentials)
yield(conn)
end
end
rescue Exception => e
Failover.get(e).execute(e, self, &block)
Expand Down Expand Up @@ -548,6 +548,7 @@ def configure(settings)
@arbiter = settings["arbiterOnly"]
@passive = settings["passive"]
@primary = settings["ismaster"]
@down_at = nil
@secondary = settings["secondary"]
discover(settings["hosts"]) if auto_discovering?
end
Expand Down Expand Up @@ -585,14 +586,13 @@ def discover(*nodes)
def flush(ops = queue)
operations, callbacks = ops.transpose
logging(operations) do
replies = nil
ensure_connected do |conn|
conn.write(operations)
replies = conn.receive_replies(operations)
replies.zip(callbacks).map do |reply, callback|
callback ? callback.call(reply) : reply
end.last
end
replies.zip(callbacks).map do |reply, callback|
callback ? callback[reply] : reply
end.last
end
ensure
ops.clear
Expand Down
2 changes: 1 addition & 1 deletion lib/moped/protocol/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def failure?(reply)
#
# @since 2.0.0
def failure_exception(reply)
Errors::OperationFailure.new(self, reply.documents.first)
reply.failure_exception || Errors::OperationFailure.new(self, reply.documents.first)
end

# Instantiate the new command.
Expand Down
7 changes: 2 additions & 5 deletions lib/moped/protocol/get_more.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,8 @@ def failure?(reply)
#
# @since 2.0.0
def failure_exception(reply)
if reply.cursor_not_found?
Errors::CursorNotFound.new(self, cursor_id)
else
Errors::QueryFailure.new(self, reply.documents.first)
end
return Errors::CursorNotFound.new(self, cursor_id) if reply.cursor_not_found?
reply.failure_exception || Errors::QueryFailure.new(self, reply.documents.first)
end

# Create a new GetMore command. The database and collection arguments
Expand Down
2 changes: 1 addition & 1 deletion lib/moped/protocol/query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def basic_selector
#
# @since 2.0.0
def failure_exception(reply)
Errors::QueryFailure.new(self, reply.documents.first)
reply.failure_exception || Errors::QueryFailure.new(self, reply.documents.first)
end

# Determine if the provided reply message is a failure with respect to a
Expand Down
45 changes: 43 additions & 2 deletions lib/moped/protocol/reply.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,24 @@ module Protocol
class Reply
include Message

# Unauthorized assertion errors.
UNAUTHORIZED = [ 10057, 16550 ]
# Error codes
UNAUTHORIZED = [
13, # not authorized for query on ...
10057,
16550, # not authorized for query on ...
16544, # not authorized for insert on ...
]
NOT_MASTER = [ 13435, 13436, 10009, 10054, 10056, 10058, 10107]

CONNECTION_ERRORS_RECONFIGURATION = [
9001,
10009,
10276, # DBClientBase::findN: transport error
11600, # interrupted at shutdown
13639, # can't connect to new replica set master
15988, # error querying server
]


# @attribute
# @return [Number] the length of the message
Expand Down Expand Up @@ -71,6 +87,31 @@ def command_failure?
(result["ok"] != 1.0 && result["ok"] != true) || error?
end

# Returns specific exception if it can be determined from error code or message returned by DB
def failure_exception
return Errors::AuthorizationFailure.new(self, documents.first) if unauthorized?
return Errors::NotMaster.new(self, documents.first) if not_master?
return Errors::ReplicaSetReconfigured.new(self, documents.first) if connection_failure?
end

# Error codes received around reconfiguration
def connection_failure?
result = documents[0]
return false if result.nil?
CONNECTION_ERRORS_RECONFIGURATION.include?(result["code"])
end

# Not master error codes.
# Replica set reconfigurations can be either in the form of an operation
# error falling into ReplicaSetReconfigured error or with an error message stating the server is
# not a master.
def not_master?
result = documents[0]
return false if result.nil?
err = error_message(result)
NOT_MASTER.include?(result["code"]) || err && err.include?("not master")
end

# Was the provided cursor id not found on the server?
#
# @example Is the cursor not on the server?
Expand Down
Loading