Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,12 @@ func (n *Node) stopRPC() {
n.ws.stop()
n.httpAuth.stop()
n.wsAuth.stop()

n.http.wait()
n.ws.wait()
n.httpAuth.wait()
n.wsAuth.wait()

n.ipc.stop()
n.stopInProc()
}
Expand Down
45 changes: 39 additions & 6 deletions node/rpcstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,13 @@ type httpServer struct {
timeouts rpc.HTTPTimeouts
mux http.ServeMux // registered handlers go here

mu sync.Mutex
server *http.Server
listener net.Listener // non-nil when server is running
mu sync.Mutex
server *http.Server
listener net.Listener // non-nil when server is running
ready bool
shutdownWG sync.WaitGroup // WG to wait for shutdown

// HTTP RPC handler things.

httpConfig httpConfig
httpHandler atomic.Value // *rpcHandler

Expand All @@ -93,6 +94,9 @@ type httpServer struct {

const (
shutdownTimeout = 5 * time.Second
// give pending requests stopPendingRequestTimeout the time to finish when the server is stopped
// if readiness probe period is 5 seconds, this is enough time for health check to be triggered
stopPendingRequestTimeout = 7 * time.Second
)

func newHTTPServer(log log.Logger, timeouts rpc.HTTPTimeouts) *httpServer {
Expand Down Expand Up @@ -167,6 +171,7 @@ func (h *httpServer) start() error {
}
h.log.Info("WebSocket enabled", "url", url)
}
h.ready = true
// if server is websocket only, return after logging
if !h.rpcAllowed() {
return nil
Expand Down Expand Up @@ -197,6 +202,22 @@ func (h *httpServer) start() error {
}

func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// server health probe endpoints
if r.Method == http.MethodGet {
// readiness probe fails during shutdown
if r.URL.Path == "/readyz" {
if h.ready {
w.WriteHeader(http.StatusNoContent)
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}
return
// liveness probe always succeeds
} else if r.URL.Path == "/livez" {
w.WriteHeader(http.StatusNoContent)
return
}
}
// check if ws request and serve if ws enabled
ws := h.wsHandler.Load().(*rpcHandler)
if ws != nil && isWebsocket(r) {
Expand Down Expand Up @@ -257,8 +278,20 @@ func validatePrefix(what, path string) error {
// stop shuts down the HTTP server.
func (h *httpServer) stop() {
h.mu.Lock()
defer h.mu.Unlock()
h.doStop()
// unit test executes stop multiple times, so we cannot increment the WG in the start method
h.shutdownWG = sync.WaitGroup{}
h.shutdownWG.Add(1)
h.ready = false
time.AfterFunc(stopPendingRequestTimeout, func() {
defer h.mu.Unlock()
h.doStop()
h.shutdownWG.Done()
})
}

// wait waits for the server to shutdown.
func (h *httpServer) wait() {
h.shutdownWG.Wait()
}

func (h *httpServer) doStop() {
Expand Down