diff --git a/node/node.go b/node/node.go index 24905c128e..12b69fc2fa 100644 --- a/node/node.go +++ b/node/node.go @@ -527,6 +527,12 @@ func (n *Node) stopRPC() { n.ws.stop() n.httpAuth.stop() n.wsAuth.stop() + + n.http.wait() + n.ws.wait() + n.httpAuth.wait() + n.wsAuth.wait() + n.ipc.stop() n.stopInProc() } diff --git a/node/rpcstack.go b/node/rpcstack.go index 6d3828ec2b..39997a6164 100644 --- a/node/rpcstack.go +++ b/node/rpcstack.go @@ -70,12 +70,13 @@ type httpServer struct { timeouts rpc.HTTPTimeouts mux http.ServeMux // registered handlers go here - mu sync.Mutex - server *http.Server - listener net.Listener // non-nil when server is running + mu sync.Mutex + server *http.Server + listener net.Listener // non-nil when server is running + ready bool + shutdownWG sync.WaitGroup // WG to wait for shutdown // HTTP RPC handler things. - httpConfig httpConfig httpHandler atomic.Value // *rpcHandler @@ -93,6 +94,9 @@ type httpServer struct { const ( shutdownTimeout = 5 * time.Second + // give pending requests stopPendingRequestTimeout the time to finish when the server is stopped + // if readiness probe period is 5 seconds, this is enough time for health check to be triggered + stopPendingRequestTimeout = 7 * time.Second ) func newHTTPServer(log log.Logger, timeouts rpc.HTTPTimeouts) *httpServer { @@ -167,6 +171,7 @@ func (h *httpServer) start() error { } h.log.Info("WebSocket enabled", "url", url) } + h.ready = true // if server is websocket only, return after logging if !h.rpcAllowed() { return nil @@ -197,6 +202,22 @@ func (h *httpServer) start() error { } func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // server health probe endpoints + if r.Method == http.MethodGet { + // readiness probe fails during shutdown + if r.URL.Path == "/readyz" { + if h.ready { + w.WriteHeader(http.StatusNoContent) + } else { + w.WriteHeader(http.StatusServiceUnavailable) + } + return + // liveness probe always succeeds + } else if r.URL.Path == "/livez" { + w.WriteHeader(http.StatusNoContent) + return + } + } // check if ws request and serve if ws enabled ws := h.wsHandler.Load().(*rpcHandler) if ws != nil && isWebsocket(r) { @@ -257,8 +278,20 @@ func validatePrefix(what, path string) error { // stop shuts down the HTTP server. func (h *httpServer) stop() { h.mu.Lock() - defer h.mu.Unlock() - h.doStop() + // unit test executes stop multiple times, so we cannot increment the WG in the start method + h.shutdownWG = sync.WaitGroup{} + h.shutdownWG.Add(1) + h.ready = false + time.AfterFunc(stopPendingRequestTimeout, func() { + defer h.mu.Unlock() + h.doStop() + h.shutdownWG.Done() + }) +} + +// wait waits for the server to shutdown. +func (h *httpServer) wait() { + h.shutdownWG.Wait() } func (h *httpServer) doStop() {