diff --git a/lib/dalli/elasticache.rb b/lib/dalli/elasticache.rb index 1fdf5f6..5f7f1da 100644 --- a/lib/dalli/elasticache.rb +++ b/lib/dalli/elasticache.rb @@ -10,7 +10,8 @@ class ElastiCache attr_reader :endpoint, :options def initialize(config_endpoint, options={}) - @endpoint = Dalli::Elasticache::AutoDiscovery::Endpoint.new(config_endpoint) + cluster_timeout = (options || {}).delete(:cluster_timeout) || (options || {}).delete('cluster_timeout') + @endpoint = Dalli::Elasticache::AutoDiscovery::Endpoint.new(config_endpoint, cluster_timeout) @options = options end @@ -40,7 +41,7 @@ def servers # Clear all cached data from the cluster endpoint def refresh config_endpoint = "#{endpoint.host}:#{endpoint.port}" - @endpoint = Dalli::Elasticache::AutoDiscovery::Endpoint.new(config_endpoint) + @endpoint = Dalli::Elasticache::AutoDiscovery::Endpoint.new(config_endpoint, endpoint.timeout) self end diff --git a/lib/dalli/elasticache/auto_discovery/endpoint.rb b/lib/dalli/elasticache/auto_discovery/endpoint.rb index 6e4c6a1..0a683af 100644 --- a/lib/dalli/elasticache/auto_discovery/endpoint.rb +++ b/lib/dalli/elasticache/auto_discovery/endpoint.rb @@ -2,10 +2,12 @@ module Dalli module Elasticache module AutoDiscovery class Endpoint + class Timeout < StandardError; end; # Endpoint configuration attr_reader :host attr_reader :port + attr_reader :timeout # Matches Strings like "my-host.cache.aws.com:11211" ENDPOINT_REGEX = /([-.a-zA-Z0-9]+):(\d+)/ @@ -15,11 +17,12 @@ class Endpoint # Legacy command for version < 1.4.14 OLD_CONFIG_COMMAND = "get AmazonElastiCache:cluster\r\n" - def initialize(endpoint) + def initialize(endpoint, timeout) ENDPOINT_REGEX.match(endpoint) do |m| @host = m[1] @port = m[2].to_i end + @timeout = timeout end # A cached ElastiCache::StatsResponse @@ -61,7 +64,7 @@ def get_config_from_remote # # Returns the raw response as a String def remote_command(command) - socket = TCPSocket.new(@host, @port) + socket = tcp_socket(@host, @port, @timeout) socket.puts command data = "" @@ -72,6 +75,53 @@ def remote_command(command) socket.close data end + + # Creates and connects a tcp socket with an optional timeout + # + # Returns a Socket or TCPSocket instance + def tcp_socket(host, port, timeout) + if timeout.nil? + TCPSocket.new(host, port) + else + # Convert the passed host into structures the non-blocking calls + # can deal with + addr = Socket.getaddrinfo(host, nil) + sockaddr = Socket.pack_sockaddr_in(port, addr[0][3]) + + Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0).tap do |socket| + socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) + + begin + # Initiate the socket connection in the background. If it doesn't fail + # immediatelyit will raise an IO::WaitWritable (Errno::EINPROGRESS) + # indicating the connection is in progress. + socket.connect_nonblock(sockaddr) + + rescue IO::WaitWritable + # IO.select will block until the socket is writable or the timeout + # is exceeded - whichever comes first. + if IO.select(nil, [socket], nil, timeout) + begin + # Verify there is now a good connection + socket.connect_nonblock(sockaddr) + rescue Errno::EISCONN + return socket + # Good news everybody, the socket is connected! + rescue + # An unexpected exception was raised - the connection is no good. + socket.close + raise + end + else + # IO.select returns nil when the socket is not ready before timeout + # seconds have elapsed + socket.close + raise Timeout, "Connection attempt took longer than timeout of #{timeout} seconds" + end + end + end + end + end end end end diff --git a/spec/elasticache_spec.rb b/spec/elasticache_spec.rb index 90c87ae..47a83f0 100644 --- a/spec/elasticache_spec.rb +++ b/spec/elasticache_spec.rb @@ -5,7 +5,8 @@ options = { :expires_in => 24*60*60, :namespace => "my_app", - :compress => true + :compress => true, + :cluster_timeout => 5 } Dalli::ElastiCache.new("my-cluster.cfg.use1.cache.amazonaws.com:11211", options) end @@ -17,6 +18,7 @@ it 'builds endpoint' do cache.endpoint.host.should == "my-cluster.cfg.use1.cache.amazonaws.com" cache.endpoint.port.should == 11211 + cache.endpoint.timeout.should == 5 end it 'stores Dalli options' do diff --git a/spec/endpoint_spec.rb b/spec/endpoint_spec.rb index a8036d5..35b8683 100644 --- a/spec/endpoint_spec.rb +++ b/spec/endpoint_spec.rb @@ -2,7 +2,7 @@ describe 'Dalli::Elasticache::AutoDiscovery::Endpoint' do let(:endpoint) do - Dalli::Elasticache::AutoDiscovery::Endpoint.new("my-cluster.cfg.use1.cache.amazonaws.com:11211") + Dalli::Elasticache::AutoDiscovery::Endpoint.new("my-cluster.cfg.use1.cache.amazonaws.com:11211", nil) end describe '.new' do