From 1a14b63851ff77263ee27fcfb3dd7f6e25f3df66 Mon Sep 17 00:00:00 2001 From: Tomas Doran Date: Sun, 13 Apr 2014 08:03:33 +0100 Subject: [PATCH 01/10] synapse etcd service watcher --- lib/synapse/service_watcher.rb | 3 + lib/synapse/service_watcher/etcd.rb | 144 ++++++++++++++++++++++++++++ synapse.gemspec | 1 + 3 files changed, 148 insertions(+) create mode 100644 lib/synapse/service_watcher/etcd.rb diff --git a/lib/synapse/service_watcher.rb b/lib/synapse/service_watcher.rb index ee05e6c2..a5fb143a 100644 --- a/lib/synapse/service_watcher.rb +++ b/lib/synapse/service_watcher.rb @@ -4,6 +4,7 @@ require "synapse/service_watcher/dns" require "synapse/service_watcher/docker" require "synapse/service_watcher/zookeeper_dns" +require "synapse/service_watcher/etcd" module Synapse class ServiceWatcher @@ -15,6 +16,7 @@ class ServiceWatcher 'dns' => DnsWatcher, 'docker' => DockerWatcher, 'zookeeper_dns' => ZookeeperDnsWatcher, + 'etcd' => EtcdWatcher } # the method which actually dispatches watcher creation requests @@ -32,3 +34,4 @@ def self.create(name, opts, synapse) end end end + diff --git a/lib/synapse/service_watcher/etcd.rb b/lib/synapse/service_watcher/etcd.rb new file mode 100644 index 00000000..e40ab612 --- /dev/null +++ b/lib/synapse/service_watcher/etcd.rb @@ -0,0 +1,144 @@ +require "synapse/service_watcher/base" + +require 'etcd' + +# Monkeypatch till 91f9e72d6d57ae3760e9266835f404d986072590 gets to rubygems.. +module Etcd + module Keys + def watch(key, opts = {}) + params = { wait: true } + fail ArgumentError, 'Second argument must be a hash' unless opts.is_a?(Hash) + timeout = opts[:timeout] || @read_timeout + index = opts[:waitIndex] || opts[:index] + params[:waitIndex] = index unless index.nil? + params[:consistent] = opts[:consistent] if opts.key?(:consistent) + params[:recursive] = opts[:recursive] if opts.key?(:recursive) + + response = api_execute(key_endpoint + key, :get, + timeout: timeout, params: params) + Response.from_http_response(response) + end + end +end + +module Synapse + class EtcdWatcher < BaseWatcher + NUMBERS_RE = /^\d+$/ + + def start + etcd_hosts = @discovery['host'] + + log.info "synapse: starting etcd watcher #{@name} @ host: #{@discovery['host']}, path: #{@discovery['path']}" + @should_exit = false + @etcd = ::Etcd.client(:host => @discovery['host'], :port => @discovery['port']) + + # call the callback to bootstrap the process + discover + @synapse.reconfigure! + watch + end + + def stop + log.warn "synapse: etcd watcher exiting" + + @should_exit = true + @etcd = nil + + log.info "synapse: etcd watcher cleaned up successfully" + end + + def ping? + @etcd.leader + end + + private + def validate_discovery_opts + raise ArgumentError, "invalid discovery method #{@discovery['method']}" \ + unless @discovery['method'] == 'etcd' + raise ArgumentError, "missing or invalid etcd host for service #{@name}" \ + unless @discovery['host'] + raise ArgumentError, "missing or invalid etcd port for service #{@name}" \ + unless @discovery['port'] + raise ArgumentError, "invalid etcd path for service #{@name}" \ + unless @discovery['path'] + end + + # helper method that ensures that the discovery path exists + def create(path) + log.debug "synapse: creating etcd path: #{path}" + @etcd.create(path, dir: true) + end + + # find the current backends at the discovery path; sets @backends + def discover + log.info "synapse: discovering backends for service #{@name}" + + new_backends = [] + d = nil + begin + d = @etcd.get(@discovery['path']) + rescue Etcd::KeyNotFound + create(@discovery['path']) + d = @etcd.get(@discovery['path']) + end + + if d.directory? + d.children.each do |node| + begin + host, port, name = deserialize_service_instance(node.value) + rescue StandardError => e + log.error "synapse: invalid data in etcd node #{id} at #{@discovery['path']}: #{e}" + else + server_port = @server_port_override ? @server_port_override : port + + # find the numberic id in the node name; used for leader elections if enabled + numeric_id = node.key.split('/').last + numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil + + log.warn "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}" + new_backends << { 'name' => name, 'host' => host, 'port' => server_port, 'id' => numeric_id} + end + end + else + log.warn "synapse: path #{@discovery['path']} is not a directory" + end + + if new_backends.empty? + if @default_servers.empty? + log.warn "synapse: no backends and no default servers for service #{@name}; using previous backends: #{@backends.inspect}" + else + log.warn "synapse: no backends for service #{@name}; using default servers: #{@default_servers.inspect}" + @backends = @default_servers + end + else + log.info "synapse: discovered #{new_backends.length} backends for service #{@name}" + @backends = new_backends + end + end + + def watch + while !@should_exit + begin + @etcd.watch(@discovery['path'], :timeout => 60, :recursive => true) + rescue Timeout::Error + else + discover + @synapse.reconfigure! + end + end + end + + # decode the data at a zookeeper endpoint + def deserialize_service_instance(data) + log.debug "synapse: deserializing process data" + decoded = JSON.parse(data) + + host = decoded['host'] || (raise ValueError, 'instance json data does not have host key') + port = decoded['port'] || (raise ValueError, 'instance json data does not have port key') + name = decoded['name'] || nil + + return host, port, name + end + end +end + diff --git a/synapse.gemspec b/synapse.gemspec index cf3e0f78..05a79339 100644 --- a/synapse.gemspec +++ b/synapse.gemspec @@ -18,6 +18,7 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency "zk", "~> 1.9.4" gem.add_runtime_dependency "docker-api", "~> 1.7.2" + gem.add_runtime_dependency "etcd", "~> 0.2.3" gem.add_development_dependency "rake" gem.add_development_dependency "rspec" From 14fde33bbfb0015d7d54ad4ca84935aaee867dfa Mon Sep 17 00:00:00 2001 From: Tomas Doran Date: Sun, 13 Apr 2014 08:45:24 +0100 Subject: [PATCH 02/10] Add etcd to Gemfile.lock --- Dockerfile | 5 +++++ Gemfile.lock | 49 ++++++++++++++++++++++++------------------------- 2 files changed, 29 insertions(+), 25 deletions(-) create mode 100644 Dockerfile diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..f936f767 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ +FROM phusion/baseimage:0.9.10 +ENV HOME /root +CMD ["/sbin/my_init"] +RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* + diff --git a/Gemfile.lock b/Gemfile.lock index 1009d53a..654f9b37 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -3,57 +3,56 @@ PATH specs: synapse (0.11.1) docker-api (~> 1.7.2) + etcd (~> 0.2.3) zk (~> 1.9.4) GEM remote: https://rubygems.org/ specs: archive-tar-minitar (0.5.2) - coderay (1.0.9) - diff-lcs (1.2.4) + coderay (1.1.0) + diff-lcs (1.2.5) docker-api (1.7.6) archive-tar-minitar excon (>= 0.28) json - excon (0.32.1) - ffi (1.9.3-java) + etcd (0.2.3) + mixlib-log + excon (0.37.0) json (1.8.1) little-plugger (1.1.3) logging (1.8.2) little-plugger (>= 1.1.3) multi_json (>= 1.8.4) method_source (0.8.2) - multi_json (1.9.2) - pry (0.9.12.2) - coderay (~> 1.0.5) + mixlib-log (1.6.0) + multi_json (1.10.1) + pry (0.9.12.6) + coderay (~> 1.0) method_source (~> 0.8) slop (~> 3.4) - pry (0.9.12.2-java) - coderay (~> 1.0.5) - method_source (~> 0.8) - slop (~> 3.4) - spoon (~> 0.0) pry-nav (0.2.3) pry (~> 0.9.10) - rake (10.1.1) - rspec (2.14.1) - rspec-core (~> 2.14.0) - rspec-expectations (~> 2.14.0) - rspec-mocks (~> 2.14.0) - rspec-core (2.14.5) - rspec-expectations (2.14.2) - diff-lcs (>= 1.1.3, < 2.0) - rspec-mocks (2.14.3) - slop (3.4.6) - spoon (0.0.4) - ffi + rake (10.3.2) + rspec (3.0.0) + rspec-core (~> 3.0.0) + rspec-expectations (~> 3.0.0) + rspec-mocks (~> 3.0.0) + rspec-core (3.0.2) + rspec-support (~> 3.0.0) + rspec-expectations (3.0.2) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.0.0) + rspec-mocks (3.0.2) + rspec-support (~> 3.0.0) + rspec-support (3.0.2) + slop (3.5.0) zk (1.9.4) logging (~> 1.8.2) zookeeper (~> 1.4.0) zookeeper (1.4.8) PLATFORMS - java ruby DEPENDENCIES From 88d6437ba7255d38e2de9aa4de24fcd4d652a978 Mon Sep 17 00:00:00 2001 From: Tomas Doran Date: Sat, 7 Jun 2014 15:14:23 +0100 Subject: [PATCH 03/10] Split large method up, support recursion through subdirs --- lib/synapse/service_watcher/etcd.rb | 52 +++++++++++++++++++---------- 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/lib/synapse/service_watcher/etcd.rb b/lib/synapse/service_watcher/etcd.rb index e40ab612..f38932d1 100644 --- a/lib/synapse/service_watcher/etcd.rb +++ b/lib/synapse/service_watcher/etcd.rb @@ -69,11 +69,43 @@ def create(path) @etcd.create(path, dir: true) end + def each_node(node) + begin + host, port, name = deserialize_service_instance(node.value) + rescue StandardError => e + log.error "synapse: invalid data in etcd node #{node.inspect} at #{@discovery['path']}: #{e} DATA #{node.value}" + nil + else + server_port = @server_port_override ? @server_port_override : port + + # find the numberic id in the node name; used for leader elections if enabled + numeric_id = node.key.split('/').last + numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil + + log.warn "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}" + { 'name' => name, 'host' => host, 'port' => server_port, 'id' => numeric_id} + end + end + + def each_dir(d) + new_backends = [] + d.children.each do |node| + if node.directory? + new_backends << each_dir(node) + else + backend = each_node(node) + if backend + new_backends << backend + end + end + end + new_backends.flatten + end + # find the current backends at the discovery path; sets @backends def discover log.info "synapse: discovering backends for service #{@name}" - new_backends = [] d = nil begin d = @etcd.get(@discovery['path']) @@ -82,23 +114,9 @@ def discover d = @etcd.get(@discovery['path']) end + new_backends = [] if d.directory? - d.children.each do |node| - begin - host, port, name = deserialize_service_instance(node.value) - rescue StandardError => e - log.error "synapse: invalid data in etcd node #{id} at #{@discovery['path']}: #{e}" - else - server_port = @server_port_override ? @server_port_override : port - - # find the numberic id in the node name; used for leader elections if enabled - numeric_id = node.key.split('/').last - numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil - - log.warn "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}" - new_backends << { 'name' => name, 'host' => host, 'port' => server_port, 'id' => numeric_id} - end - end + new_backends = each_dir(d) else log.warn "synapse: path #{@discovery['path']} is not a directory" end From 8b5e3fcce4d5ed2c7efa412a19bdc09c41ff34cc Mon Sep 17 00:00:00 2001 From: Tomas Doran Date: Sat, 7 Jun 2014 15:30:35 +0100 Subject: [PATCH 04/10] Make recursion work --- lib/synapse/service_watcher/etcd.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/synapse/service_watcher/etcd.rb b/lib/synapse/service_watcher/etcd.rb index f38932d1..f4b43804 100644 --- a/lib/synapse/service_watcher/etcd.rb +++ b/lib/synapse/service_watcher/etcd.rb @@ -91,7 +91,7 @@ def each_dir(d) new_backends = [] d.children.each do |node| if node.directory? - new_backends << each_dir(node) + new_backends << each_dir(@etcd.get(node.key)) else backend = each_node(node) if backend From d653a11bd4303546ee69a7b00efce906266ad6fa Mon Sep 17 00:00:00 2001 From: Tomas Doran Date: Sat, 7 Jun 2014 15:58:17 +0100 Subject: [PATCH 05/10] Make the watcher into a thread --- lib/synapse/service_watcher/etcd.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/synapse/service_watcher/etcd.rb b/lib/synapse/service_watcher/etcd.rb index f4b43804..992bc1df 100644 --- a/lib/synapse/service_watcher/etcd.rb +++ b/lib/synapse/service_watcher/etcd.rb @@ -35,7 +35,9 @@ def start # call the callback to bootstrap the process discover @synapse.reconfigure! - watch + @watcher = Thread.new do + watch + end end def stop From 6d43f99060c4cd135bbc3a4e81bdc65ed80ad5f7 Mon Sep 17 00:00:00 2001 From: Tomas Doran Date: Sat, 7 Jun 2014 17:38:32 +0100 Subject: [PATCH 06/10] Only reconfigure if new backends are discovered --- lib/synapse/service_watcher/etcd.rb | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/lib/synapse/service_watcher/etcd.rb b/lib/synapse/service_watcher/etcd.rb index 992bc1df..c5f527ce 100644 --- a/lib/synapse/service_watcher/etcd.rb +++ b/lib/synapse/service_watcher/etcd.rb @@ -126,13 +126,21 @@ def discover if new_backends.empty? if @default_servers.empty? log.warn "synapse: no backends and no default servers for service #{@name}; using previous backends: #{@backends.inspect}" + false else log.warn "synapse: no backends for service #{@name}; using default servers: #{@default_servers.inspect}" @backends = @default_servers + true end else log.info "synapse: discovered #{new_backends.length} backends for service #{@name}" - @backends = new_backends + if @backends == new_backends + log.info "synapse: discovered #{new_backends.length} backends (including new) for service #{@name}" + true + else + log.info "synapse: discovered #{new_backends.length} backends for service #{@name}" + false + end end end @@ -142,8 +150,9 @@ def watch @etcd.watch(@discovery['path'], :timeout => 60, :recursive => true) rescue Timeout::Error else - discover - @synapse.reconfigure! + if discover + @synapse.reconfigure! + end end end end From ac57c2caa818f916b384787767c6552ed3775285 Mon Sep 17 00:00:00 2001 From: Tomas Doran Date: Sat, 7 Jun 2014 18:18:24 +0100 Subject: [PATCH 07/10] Fix bug stopping backends ever registering --- lib/synapse/service_watcher/etcd.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/synapse/service_watcher/etcd.rb b/lib/synapse/service_watcher/etcd.rb index c5f527ce..74e281d9 100644 --- a/lib/synapse/service_watcher/etcd.rb +++ b/lib/synapse/service_watcher/etcd.rb @@ -136,6 +136,7 @@ def discover log.info "synapse: discovered #{new_backends.length} backends for service #{@name}" if @backends == new_backends log.info "synapse: discovered #{new_backends.length} backends (including new) for service #{@name}" + @backends = new_backends true else log.info "synapse: discovered #{new_backends.length} backends for service #{@name}" From 276fcebd2d17019ae08f4c6f80f209951e9980f7 Mon Sep 17 00:00:00 2001 From: Tomas Doran Date: Sat, 7 Jun 2014 18:21:46 +0100 Subject: [PATCH 08/10] Fix bug with logic being wrong way around --- lib/synapse/service_watcher/etcd.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/synapse/service_watcher/etcd.rb b/lib/synapse/service_watcher/etcd.rb index 74e281d9..45e72b99 100644 --- a/lib/synapse/service_watcher/etcd.rb +++ b/lib/synapse/service_watcher/etcd.rb @@ -133,8 +133,7 @@ def discover true end else - log.info "synapse: discovered #{new_backends.length} backends for service #{@name}" - if @backends == new_backends + if @backends != new_backends log.info "synapse: discovered #{new_backends.length} backends (including new) for service #{@name}" @backends = new_backends true From deedf4d1ba66a225217cff6e994309b97e45d47a Mon Sep 17 00:00:00 2001 From: Alan Smith Date: Thu, 14 Aug 2014 16:11:59 -0400 Subject: [PATCH 09/10] Switched EtcdWatcher to use the `hosts` parameter and rotate through the hosts until a reachable host is found. --- lib/synapse/service_watcher/etcd.rb | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/lib/synapse/service_watcher/etcd.rb b/lib/synapse/service_watcher/etcd.rb index 45e72b99..d17b46c0 100644 --- a/lib/synapse/service_watcher/etcd.rb +++ b/lib/synapse/service_watcher/etcd.rb @@ -26,11 +26,15 @@ class EtcdWatcher < BaseWatcher NUMBERS_RE = /^\d+$/ def start - etcd_hosts = @discovery['host'] + @etcd_hosts = @discovery['hosts'].shuffle log.info "synapse: starting etcd watcher #{@name} @ host: #{@discovery['host']}, path: #{@discovery['path']}" @should_exit = false - @etcd = ::Etcd.client(:host => @discovery['host'], :port => @discovery['port']) + + host, port = @etcd_hosts[0].split(':') + host = host || '127.0.0.1' + port = port || 4003 + @etcd = ::Etcd.client(:host => host, :port => port) # call the callback to bootstrap the process discover @@ -50,6 +54,22 @@ def stop end def ping? + etcd_connected? + end + + private + def etcd_connected? + unless @etcd.leader + @etcd_hosts.each do |h| + host, port = h.split(':') + host = host || '127.0.0.1' + port = port || 4003 + @etcd = ::Etcd.client(:host => host, :port => port) + + break if @etcd.leader + end + end + @etcd.leader end From 8c51ed17a5fdaa99d381c27e071f3d21110ef739 Mon Sep 17 00:00:00 2001 From: Alan Smith Date: Fri, 15 Aug 2014 10:38:26 -0400 Subject: [PATCH 10/10] Switched to less resilient failover for etcd in favor of fewer connections made to etcd. --- lib/synapse/service_watcher/etcd.rb | 34 ++++++++++++----------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/lib/synapse/service_watcher/etcd.rb b/lib/synapse/service_watcher/etcd.rb index d17b46c0..e392f701 100644 --- a/lib/synapse/service_watcher/etcd.rb +++ b/lib/synapse/service_watcher/etcd.rb @@ -31,10 +31,20 @@ def start log.info "synapse: starting etcd watcher #{@name} @ host: #{@discovery['host']}, path: #{@discovery['path']}" @should_exit = false - host, port = @etcd_hosts[0].split(':') - host = host || '127.0.0.1' - port = port || 4003 - @etcd = ::Etcd.client(:host => host, :port => port) + @etcd_hosts.each do |h| + host, port = h.split(':') + port = port || 4003 + @etcd = ::Etcd.client(:host => host, :port => port) + + connected = + begin + @etcd.leader + rescue + false + end + + break if connected + end # call the callback to bootstrap the process discover @@ -54,22 +64,6 @@ def stop end def ping? - etcd_connected? - end - - private - def etcd_connected? - unless @etcd.leader - @etcd_hosts.each do |h| - host, port = h.split(':') - host = host || '127.0.0.1' - port = port || 4003 - @etcd = ::Etcd.client(:host => host, :port => port) - - break if @etcd.leader - end - end - @etcd.leader end