Skip to content

Commit

Permalink
add comment to clarify the etcd shutting down workflow
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Jan 21, 2025
1 parent 79f3417 commit e89357b
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 9 deletions.
57 changes: 50 additions & 7 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,23 @@ type Etcd struct {

Server *etcdserver.EtcdServer

cfg Config
stopc chan struct{}
errc chan error
cfg Config

// closeOnce is to ensure `stopc` is closed only once, no matter
// how many times the Close() method is called.
closeOnce sync.Once
wg sync.WaitGroup
// stopc is used to notify the sub goroutines not to send
// any errors to `errc`.
stopc chan struct{}

// errc is used to receive error from sub goroutines (including
// client handler, peer handler and metrics handler). It's closed
// after all these sub goroutines exit (checked via `wg`). Writers
// should avoid writing after `stopc` is closed by selecting on
// reading from `stopc`.
errc chan error
// wg is used to track the lifecycle of all sub goroutines created by `StartEtcd`.
wg sync.WaitGroup
}

type peerListener struct {
Expand Down Expand Up @@ -388,6 +399,24 @@ func (e *Etcd) Config() Config {
// Close gracefully shuts down all servers/listeners.
// Client requests will be terminated with request timeout.
// After timeout, enforce remaning requests be closed immediately.
//
// The rough workflow to shut down etcd:
// 1. close the `stopc` channel, so that all error handlers (child
// goroutines) won't send back any errors anymore;
// 2. close all client and metrics listeners, so that etcd server
// stops receiving any new connection immediately;
// 3. stop the http and grpc servers gracefully, within request timeout;
// 4. call the cancel function to close the gateway context, so that
// all gateway connections are closed.
// 5. stop etcd server gracefully, and ensure the main raft loop
// goroutine is stopped;
// 6. stop all peer listeners, so that it stops receiving peer connections
// and messages (wait up to 1-second);
// 7. wait for all child goroutines (i.e. client handlers, peer handlers
// and metrics handlers) to exit;
// 8. close the `errc` channel to release the resource. Note that it's only
// safe to close the `errc` after step 7 above is done, otherwise the
// child goroutines may send errors back to already closed `errc` channel.
func (e *Etcd) Close() {
fields := []zap.Field{
zap.String("name", e.cfg.Name),
Expand All @@ -407,10 +436,14 @@ func (e *Etcd) Close() {
lg.Sync()
}()

// 1. close the `stopc` channel, so that all error handlers (child
// goroutines) won't send back any errors anymore;
e.closeOnce.Do(func() {
close(e.stopc)
})

// 2. close all client and metrics listeners, so that etcd server
// stops receiving any new connection immediately;
for i := range e.Clients {
if e.Clients[i] != nil {
e.Clients[i].Close()
Expand All @@ -421,7 +454,7 @@ func (e *Etcd) Close() {
e.metricsListeners[i].Close()

Check warning on line 454 in server/embed/etcd.go

View check run for this annotation

Codecov / codecov/patch

server/embed/etcd.go#L454

Added line #L454 was not covered by tests
}

// close client requests with request timeout
// 3. stop the http and grpc servers gracefully, within request timeout;
timeout := 2 * time.Second
if e.Server != nil {
timeout = e.Server.Cfg.ReqTimeout()
Expand All @@ -434,6 +467,8 @@ func (e *Etcd) Close() {
}
}

// 4. call the cancel function to close the gateway context, so that
// all gateway connections are closed.
for _, sctx := range e.sctxs {
sctx.cancel()
}
Expand All @@ -443,12 +478,14 @@ func (e *Etcd) Close() {
e.tracingExporterShutdown()
}

// close rafthttp transports
// 5. stop etcd server gracefully, and ensure the main raft loop
// goroutine is stopped;
if e.Server != nil {
e.Server.Stop()
}

// close all idle connections in peer handler (wait up to 1-second)
// 6. stop all peer listeners, so that it stops receiving peer connections
// and messages (wait up to 1-second);
for i := range e.Peers {
if e.Peers[i] != nil && e.Peers[i].close != nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Expand All @@ -457,7 +494,13 @@ func (e *Etcd) Close() {
}
}
if e.errc != nil {
// 7. wait for all child goroutines (i.e. client handlers, peer handlers
// and metrics handlers) to exit;
e.wg.Wait()

// 8. close the `errc` channel to release the resource. Note that it's only
// safe to close the `errc` after step 7 above is done, otherwise the
// child goroutines may send errors back to already closed `errc` channel.
close(e.errc)
}
}
Expand Down
12 changes: 10 additions & 2 deletions server/embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,22 @@ type serveCtx struct {
insecure bool
httpOnly bool

// ctx is used to control the grpc gateway. Terminate the grpc gateway
// by calling `cancel` when shutting down the etcd.
ctx context.Context
cancel context.CancelFunc

userHandlers map[string]http.Handler
serviceRegister func(*grpc.Server)
serversC chan *servers
closeOnce sync.Once

// serversC is used to receive the http and grpc server objects (created
// in `serve`), both of which will be closed when shutting down the etcd.
// Close it when `serve` returns or when etcd fails to bootstrap.
serversC chan *servers
// closeOnce is to ensure `serversC` is closed only once.
closeOnce sync.Once

// wg is used to track the lifecycle of all sub goroutines created by `serve`.
wg sync.WaitGroup
}

Expand Down

0 comments on commit e89357b

Please sign in to comment.