Skip to content
This repository was archived by the owner on Jan 30, 2020. It is now read-only.

Commit 365565e

Browse files
author
Dongsu Park
committed
Merge pull request #1673 from endocode/dongsu/grpc-fix-build-clientconn
registry/rpc: use simpleBalancer instead of ClientConn.State()
2 parents 774ac31 + ecb121a commit 365565e

File tree

66 files changed

+13025
-3148
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+13025
-3148
lines changed

engine/rpcengine.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,12 @@ func rpcAcquireLeadership(reg registry.Registry, lManager lease.Manager, machID
110110
return l
111111
}
112112

113-
if existing != nil && existing.Version() >= ver {
113+
// If reg is not ready, we have to give it an opportunity to steal lease
114+
// below. Otherwise it could be blocked forever by the existing engine leader,
115+
// which could cause gRPC registry to always fail when a leader already exists.
116+
// Thus we return the existing leader, only if reg.IsRegistryReady() == true.
117+
// TODO(dpark): refactor the entire function for better readability. - 20160908
118+
if (existing != nil && existing.Version() >= ver) && reg.IsRegistryReady() {
114119
log.Debugf("Lease already held by Machine(%s) operating at acceptable version %d", existing.MachineID(), existing.Version())
115120
return existing
116121
}

glide.lock

Lines changed: 4 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

glide.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,4 @@ import:
3535
- package: google.golang.org/api
3636
subpackages:
3737
- googleapi
38+
- package: github.com/spf13/viper

protobuf/fleet.pb.go

Lines changed: 699 additions & 1548 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

registry/rpc/balancer.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright 2016 The fleet Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package rpc
16+
17+
import (
18+
"sync"
19+
"sync/atomic"
20+
21+
"golang.org/x/net/context"
22+
"google.golang.org/grpc"
23+
)
24+
25+
// simpleBalancer implements grpc.Balancer interface, being as simple as possible.
26+
// to be used only for fleet.
27+
//
28+
// In principle grpc.Balancer is meant to be handling load balancer across
29+
// multiple connections via addresses for RPCs.
30+
// * Start() does initialization work to bootstrap a Balancer.
31+
// * Up() informs the Balancer that gRPC has a connection to the server at addr.
32+
// It returns Down() which is called once the connection to addr gets lost
33+
// or closed.
34+
// * Get() gets the address of a server for the RPC corresponding to ctx.
35+
// * Notify() returns a channel that is used by gRPC internals to watch the
36+
// addresses gRPC needs to connect.
37+
// * Close() shuts down the balancer.
38+
//
39+
// However, as fleet needs to care only about a single connection, simpleBalancer
40+
// in fleet should be kept as simple as possible. Most crucially simpleBalancer
41+
// provides a simple channel, readyc, to notify the rpcRegistry of the connection
42+
// being available. readyc gets closed in Up(), which will cause, for example,
43+
// IsRegistryReady() to recognize that the connection is available. We don't need
44+
// to care about which value the readyc has.
45+
type simpleBalancer struct {
46+
addrs []string
47+
numGets uint32
48+
49+
// readyc closes once the first connection is up
50+
readyc chan struct{}
51+
readyOnce sync.Once
52+
}
53+
54+
func newSimpleBalancer(eps []string) *simpleBalancer {
55+
return &simpleBalancer{
56+
addrs: eps,
57+
readyc: make(chan struct{}),
58+
}
59+
}
60+
61+
func (b *simpleBalancer) Start(target string) error { return nil }
62+
63+
func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
64+
b.readyOnce.Do(func() { close(b.readyc) })
65+
return func(error) {}
66+
}
67+
68+
func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
69+
v := atomic.AddUint32(&b.numGets, 1)
70+
addr := b.addrs[v%uint32(len(b.addrs))]
71+
72+
return grpc.Address{Addr: addr}, func() {}, nil
73+
}
74+
75+
func (b *simpleBalancer) Notify() <-chan []grpc.Address { return nil }
76+
77+
func (b *simpleBalancer) Close() error {
78+
b.readyc = make(chan struct{})
79+
return nil
80+
}

registry/rpc/rpcregistry.go

Lines changed: 16 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -32,22 +32,14 @@ import (
3232
"github.com/coreos/fleet/unit"
3333
)
3434

35-
const (
36-
grpcConnectionTimeout = 5000 * time.Millisecond
37-
38-
grpcConnectionStateReady = "READY"
39-
grpcConnectionStateConnecting = "CONNECTING"
40-
grpcConnectionStateShutdown = "SHUTDOWN"
41-
grpcConnectionStateFailure = "TRANSIENT_FAILURE"
42-
)
43-
4435
var DebugRPCRegistry bool = false
4536

4637
type RPCRegistry struct {
4738
dialer func(addr string, timeout time.Duration) (net.Conn, error)
4839
mu *sync.Mutex
4940
registryClient pb.RegistryClient
5041
registryConn *grpc.ClientConn
42+
balancer *simpleBalancer
5143
}
5244

5345
func NewRPCRegistry(dialer func(string, time.Duration) (net.Conn, error)) *RPCRegistry {
@@ -63,35 +55,17 @@ func (r *RPCRegistry) ctx() context.Context {
6355
}
6456

6557
func (r *RPCRegistry) getClient() pb.RegistryClient {
66-
for {
67-
st, err := r.registryConn.State()
68-
if err != nil {
69-
log.Fatalf("Unable to get the state of rpc connection: %v", err)
70-
}
71-
state := st.String()
72-
if state == grpcConnectionStateReady {
73-
break
74-
} else if state == grpcConnectionStateConnecting {
75-
if DebugRPCRegistry {
76-
log.Infof("gRPC connection state: %s", state)
77-
}
78-
continue
79-
} else if state == grpcConnectionStateFailure || state == grpcConnectionStateShutdown {
80-
log.Infof("gRPC connection state '%s' reports an error in the connection", state)
81-
log.Info("Reconnecting gRPC peer to fleet-engine...")
82-
r.Connect()
83-
}
84-
85-
time.Sleep(grpcConnectionTimeout)
86-
}
87-
8858
return r.registryClient
8959
}
9060

9161
func (r *RPCRegistry) Connect() {
9262
// We want the connection operation to block and constantly reconnect using grpc backoff
9363
log.Info("Starting gRPC connection to fleet-engine...")
94-
connection, err := grpc.Dial(":fleet-engine:", grpc.WithTimeout(12*time.Second), grpc.WithInsecure(), grpc.WithDialer(r.dialer), grpc.WithBlock())
64+
ep_engines := []string{":fleet-engine:"}
65+
r.balancer = newSimpleBalancer(ep_engines)
66+
connection, err := grpc.Dial(ep_engines[0],
67+
grpc.WithTimeout(12*time.Second), grpc.WithInsecure(),
68+
grpc.WithDialer(r.dialer), grpc.WithBlock(), grpc.WithBalancer(r.balancer))
9569
if err != nil {
9670
log.Fatalf("Unable to dial to registry: %s", err)
9771
}
@@ -106,24 +80,22 @@ func (r *RPCRegistry) Close() {
10680

10781
func (r *RPCRegistry) IsRegistryReady() bool {
10882
if r.registryConn != nil {
109-
st, err := r.registryConn.State()
110-
if err != nil {
111-
log.Fatalf("Unable to get the state of rpc connection: %v", err)
112-
}
113-
connState := st.String()
114-
log.Infof("Registry connection state: %s", connState)
115-
if connState != grpcConnectionStateReady {
116-
log.Errorf("unable to connect to registry connection state: %s", connState)
117-
return false
83+
hasConn := false
84+
if r.balancer != nil {
85+
select {
86+
case <-r.balancer.readyc:
87+
hasConn = true
88+
}
11889
}
119-
log.Infof("Getting server status...")
90+
12091
status, err := r.Status()
12192
if err != nil {
12293
log.Errorf("unable to get the status of the registry service %v", err)
12394
return false
12495
}
125-
log.Infof("Status of rpc service: %d, connection state: %s", status, connState)
126-
return status == pb.HealthCheckResponse_SERVING && err == nil
96+
log.Infof("Status of rpc service: %d, balancer has a connection: %t", status, hasConn)
97+
98+
return hasConn && status == pb.HealthCheckResponse_SERVING && err == nil
12799
}
128100
return false
129101
}

registry/rpc/rpcregistry_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ func TestRPCRegistryClientCreation(t *testing.T) {
3434
t.Fatalf("failed to parse listener address: %v", err)
3535
}
3636
addr := "localhost:" + port
37-
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second), grpc.WithBlock())
37+
b := newSimpleBalancer([]string{addr})
38+
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second),
39+
grpc.WithBlock(), grpc.WithBalancer(b))
3840
if err != nil {
3941
t.Fatalf("failed to dial to the server %q: %v", addr, err)
4042
}

vendor/github.com/coreos/go-systemd/activation/listeners.go

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)