Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,12 @@ func (n *Node) stopRPC() {
n.ws.stop()
n.httpAuth.stop()
n.wsAuth.stop()

n.http.wait()
n.ws.wait()
n.httpAuth.wait()
n.wsAuth.wait()

n.ipc.stop()
n.stopInProc()
}
Expand Down
43 changes: 40 additions & 3 deletions node/rpcstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ type httpServer struct {
mu sync.Mutex
server *http.Server
listener net.Listener // non-nil when server is running
ready bool
shutdown chan struct{} // Channel to wait for termination notifications

// HTTP RPC handler things.

httpConfig httpConfig
httpHandler atomic.Value // *rpcHandler

Expand All @@ -93,6 +94,9 @@ type httpServer struct {

const (
shutdownTimeout = 5 * time.Second
// give pending requests stopPendingRequestTimeout the time to finish when the server is stopped
// if readiness probe period is 5 seconds, this is enough time for health check to be triggered
stopPendingRequestTimeout = 7 * time.Second
)

func newHTTPServer(log log.Logger, timeouts rpc.HTTPTimeouts) *httpServer {
Expand Down Expand Up @@ -137,6 +141,7 @@ func (h *httpServer) start() error {
if h.endpoint == "" || h.listener != nil {
return nil // already running or not configured
}
h.shutdown = make(chan struct{})

// Initialize the server.
h.server = &http.Server{Handler: h}
Expand Down Expand Up @@ -167,6 +172,7 @@ func (h *httpServer) start() error {
}
h.log.Info("WebSocket enabled", "url", url)
}
h.ready = true
// if server is websocket only, return after logging
if !h.rpcAllowed() {
return nil
Expand Down Expand Up @@ -197,6 +203,20 @@ func (h *httpServer) start() error {
}

func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Permit dumb empty requests for remote health-checks (AWS)
if r.Method == http.MethodGet && r.ContentLength == 0 && r.URL.RawQuery == "" {
if r.URL.Path == "/readyz" {
if h.ready {
w.WriteHeader(http.StatusNoContent)
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}
return
} else if r.URL.Path == "/livez" {
w.WriteHeader(http.StatusNoContent)
return
}
}
// check if ws request and serve if ws enabled
ws := h.wsHandler.Load().(*rpcHandler)
if ws != nil && isWebsocket(r) {
Expand Down Expand Up @@ -257,8 +277,25 @@ func validatePrefix(what, path string) error {
// stop shuts down the HTTP server.
func (h *httpServer) stop() {
h.mu.Lock()
defer h.mu.Unlock()
h.doStop()
h.ready = false
time.AfterFunc(stopPendingRequestTimeout, func() {
defer h.mu.Unlock()
h.doStop()
if h.shutdown != nil {
func() {
// Avoid panic when closing closed channel
defer func() { recover() }()
close(h.shutdown)
}()
}
})
}

// ShutdownWait waits for the server to shutdown.
func (h *httpServer) wait() {
if h.shutdown != nil {
<-h.shutdown
}
}

func (h *httpServer) doStop() {
Expand Down