From 9bb3a1d9760ca4ad243b530d34c5170cf87c5f32 Mon Sep 17 00:00:00 2001
From: Nathan Johnson <njohnson@ena.com>
Date: Thu, 17 Nov 2022 19:15:54 -0600
Subject: [PATCH] Updating grpc handler to gracefully close backend connections

This should address part 1 of #807 and #912

update grpc docs
---
 config/config.go                              |  1 +
 config/default.go                             | 25 ++++----
 config/kvslice.go                             |  4 +-
 config/load.go                                |  1 +
 demo/server/server.go                         | 37 ++++++-----
 docs/content/ref/proxy.grpcmaxrxmsgsize.md    | 10 +++
 docs/content/ref/proxy.grpcmaxtxmsgsize.md    | 10 +++
 docs/content/ref/proxy.grpcshutdowntimeout.md |  9 +++
 fabio.properties                              |  6 ++
 logger/logger.go                              | 63 +++++++++----------
 proxy/grpc_handler.go                         | 26 +++++---
 proxy/http_headers.go                         |  1 -
 proxy/http_integration_test.go                |  2 +-
 registry/consul/register.go                   |  5 +-
 route/picker.go                               |  2 -
 route/picker_test.go                          |  2 +-
 route/table.go                                |  2 +-
 route/table_test.go                           |  2 +-
 18 files changed, 123 insertions(+), 85 deletions(-)
 create mode 100644 docs/content/ref/proxy.grpcmaxrxmsgsize.md
 create mode 100644 docs/content/ref/proxy.grpcmaxtxmsgsize.md
 create mode 100644 docs/content/ref/proxy.grpcshutdowntimeout.md

diff --git a/config/config.go b/config/config.go
index 484af7293..90637f6e4 100644
--- a/config/config.go
+++ b/config/config.go
@@ -93,6 +93,7 @@ type Proxy struct {
 	AuthSchemes           map[string]AuthScheme
 	GRPCMaxRxMsgSize      int
 	GRPCMaxTxMsgSize      int
+	GRPCGShutdownTimeout  time.Duration
 }
 
 type STSHeader struct {
diff --git a/config/default.go b/config/default.go
index de9f00594..011e9e315 100644
--- a/config/default.go
+++ b/config/default.go
@@ -42,18 +42,19 @@ var defaultConfig = &Config{
 		},
 	},
 	Proxy: Proxy{
-		MaxConn:             10000,
-		Strategy:            "rnd",
-		Matcher:             "prefix",
-		NoRouteStatus:       404,
-		DialTimeout:         30 * time.Second,
-		FlushInterval:       time.Second,
-		GlobalFlushInterval: 0,
-		LocalIP:             LocalIPString(),
-		AuthSchemes:         map[string]AuthScheme{},
-		IdleConnTimeout:     15 * time.Second,
-		GRPCMaxRxMsgSize:    4 * 1024 * 1024, // 4M
-		GRPCMaxTxMsgSize:    4 * 1024 * 1024, // 4M
+		MaxConn:              10000,
+		Strategy:             "rnd",
+		Matcher:              "prefix",
+		NoRouteStatus:        404,
+		DialTimeout:          30 * time.Second,
+		FlushInterval:        time.Second,
+		GlobalFlushInterval:  0,
+		LocalIP:              LocalIPString(),
+		AuthSchemes:          map[string]AuthScheme{},
+		IdleConnTimeout:      15 * time.Second,
+		GRPCMaxRxMsgSize:     4 * 1024 * 1024, // 4M
+		GRPCMaxTxMsgSize:     4 * 1024 * 1024, // 4M
+		GRPCGShutdownTimeout: time.Second * 2,
 	},
 	Registry: Registry{
 		Backend: "consul",
diff --git a/config/kvslice.go b/config/kvslice.go
index c9843e648..8ae2cbc79 100644
--- a/config/kvslice.go
+++ b/config/kvslice.go
@@ -8,7 +8,7 @@ import (
 
 // parseKVSlice parses a configuration string in the form
 //
-//   key=val;key=val,key=val;key=val
+//	key=val;key=val,key=val;key=val
 //
 // into a list of string maps. maps are separated by comma and key/value
 // pairs within a map are separated by semicolons. The first key/value
@@ -16,7 +16,7 @@ import (
 // empty key. This allows support of legacy configuration formats which
 // are
 //
-//   val;opt1=val1;opt2=val2;...
+//	val;opt1=val1;opt2=val2;...
 func parseKVSlice(in string) ([]map[string]string, error) {
 	var keyOrFirstVal string
 	maps := []map[string]string{}
diff --git a/config/load.go b/config/load.go
index f0e38397a..ce26057b0 100644
--- a/config/load.go
+++ b/config/load.go
@@ -153,6 +153,7 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c
 	f.BoolVar(&cfg.Proxy.STSHeader.Preload, "proxy.header.sts.preload", defaultConfig.Proxy.STSHeader.Preload, "direct HSTS to pass the preload directive")
 	f.IntVar(&cfg.Proxy.GRPCMaxRxMsgSize, "proxy.grpcmaxrxmsgsize", defaultConfig.Proxy.GRPCMaxRxMsgSize, "max grpc receive message size (in bytes)")
 	f.IntVar(&cfg.Proxy.GRPCMaxTxMsgSize, "proxy.grpcmaxtxmsgsize", defaultConfig.Proxy.GRPCMaxTxMsgSize, "max grpc transmit message size (in bytes)")
+	f.DurationVar(&cfg.Proxy.GRPCGShutdownTimeout, "proxy.grpcshutdowntimeout", defaultConfig.Proxy.GRPCGShutdownTimeout, "amount of time to wait for graceful shutdown of grpc backend")
 	f.StringVar(&gzipContentTypesValue, "proxy.gzip.contenttype", defaultValues.GZIPContentTypesValue, "regexp of content types to compress")
 	f.StringVar(&listenerValue, "proxy.addr", defaultValues.ListenerValue, "listener config")
 	f.StringVar(&certSourcesValue, "proxy.cs", defaultValues.CertSourcesValue, "certificate sources")
diff --git a/demo/server/server.go b/demo/server/server.go
index d2ba0308d..6daa065f8 100644
--- a/demo/server/server.go
+++ b/demo/server/server.go
@@ -4,34 +4,33 @@
 //
 // During startup the server performs the following steps:
 //
-// * Add a handler for each prefix which provides a unique
-//   response for that instance and endpoint
-// * Add a `/health` handler for the consul health check
-// * Register the service in consul with the listen address,
-//   a health check under the given name and with one `urlprefix-`
-//   tag per prefix
-// * Install a signal handler to deregister the service on exit
+//   - Add a handler for each prefix which provides a unique
+//     response for that instance and endpoint
+//   - Add a `/health` handler for the consul health check
+//   - Register the service in consul with the listen address,
+//     a health check under the given name and with one `urlprefix-`
+//     tag per prefix
+//   - Install a signal handler to deregister the service on exit
 //
 // If the protocol is set to "ws" the registered endpoints function
 // as websocket echo servers.
 //
 // Example:
 //
-//   # http server
-//   ./server -addr 127.0.0.1:5000 -name svc-a -prefix /foo -prefix /bar
-//   ./server -addr 127.0.0.1:5001 -name svc-b -prefix /baz -prefix /bar
-//   ./server -addr 127.0.0.1:5002 -name svc-c -prefix "/gogl redirect=301,https://www.google.de/"
+//	# http server
+//	./server -addr 127.0.0.1:5000 -name svc-a -prefix /foo -prefix /bar
+//	./server -addr 127.0.0.1:5001 -name svc-b -prefix /baz -prefix /bar
+//	./server -addr 127.0.0.1:5002 -name svc-c -prefix "/gogl redirect=301,https://www.google.de/"
 //
-//   # https server
-//   ./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix /foo
-//   ./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix "/foo tlsskipverify=true"
+//	# https server
+//	./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix /foo
+//	./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix "/foo tlsskipverify=true"
 //
-//   # websocket server
-//   ./server -addr 127.0.0.1:6000 -name ws-a -proto ws -prefix /echo1 -prefix /echo2
-//
-//   # tcp server
-//   ./server -addr 127.0.0.1:7000 -name tcp-a -proto tcp -prefix :1234
+//	# websocket server
+//	./server -addr 127.0.0.1:6000 -name ws-a -proto ws -prefix /echo1 -prefix /echo2
 //
+//	# tcp server
+//	./server -addr 127.0.0.1:7000 -name tcp-a -proto tcp -prefix :1234
 package main
 
 import (
diff --git a/docs/content/ref/proxy.grpcmaxrxmsgsize.md b/docs/content/ref/proxy.grpcmaxrxmsgsize.md
new file mode 100644
index 000000000..8135c2451
--- /dev/null
+++ b/docs/content/ref/proxy.grpcmaxrxmsgsize.md
@@ -0,0 +1,10 @@
+---
+title: "proxy.grpcmaxrxmsgsize"
+---
+
+`proxy.grpcmaxrxmsgsize` configures the grpc max receive message size in bytes.  The default
+value is
+
+    proxy.grpcmaxrxmsgsize = 4194304
+
+which is 4MB
diff --git a/docs/content/ref/proxy.grpcmaxtxmsgsize.md b/docs/content/ref/proxy.grpcmaxtxmsgsize.md
new file mode 100644
index 000000000..c7e6ab4b4
--- /dev/null
+++ b/docs/content/ref/proxy.grpcmaxtxmsgsize.md
@@ -0,0 +1,10 @@
+---
+title: "proxy.grpcmaxtxmsgsize"
+---
+
+`proxy.grpcmaxtxmsgsize` configures the grpc max transmit message size in bytes.  The default
+value is
+
+    proxy.grpcmaxtxmsgsize = 4194304
+
+which is 4MB
diff --git a/docs/content/ref/proxy.grpcshutdowntimeout.md b/docs/content/ref/proxy.grpcshutdowntimeout.md
new file mode 100644
index 000000000..12e17435e
--- /dev/null
+++ b/docs/content/ref/proxy.grpcshutdowntimeout.md
@@ -0,0 +1,9 @@
+---
+title: "proxy.grpcshutdowntimeout"
+---
+
+`proxy.grpcshutdowntimeout` configures the amount of time fabio will wait to attempt
+to close the connection while waiting for grpc traffic to finish to a backend that's been
+deregistered.  The default value is
+
+    proxy.grpcshutdowntimeout = 2s
diff --git a/fabio.properties b/fabio.properties
index 74a0cb1a2..92dd55253 100644
--- a/fabio.properties
+++ b/fabio.properties
@@ -561,6 +561,12 @@
 # The default is
 # proxy.grpcmaxtxmsgsize = 4194304
 #
+#
+# proxy.grpcshutdowntimeout configures the amount of time fabio will wait to attempt
+# to close the connection while waiting for grpc traffic to finish to a backend that's been
+# deregistered.  Default value is
+# proxy.grpcshutdowntimeout = 2s
+# setting to 0s disables the wait.
 
 # log.access.format configures the format of the access log.
 #
diff --git a/logger/logger.go b/logger/logger.go
index b9077320e..01361d2f1 100644
--- a/logger/logger.go
+++ b/logger/logger.go
@@ -5,38 +5,37 @@
 // takes place. Text between two fields is printed verbatim. See the common
 // log file formats for an example.
 //
-//   $header.<name>           - request http header (name: [a-zA-Z0-9-]+)
-//   $remote_addr             - host:port of remote client
-//   $remote_host             - host of remote client
-//   $remote_port             - port of remote client
-//   $request                 - request <method> <uri> <proto>
-//   $request_args            - request query parameters
-//   $request_host            - request host header (aka server name)
-//   $request_method          - request method
-//   $request_scheme          - request scheme
-//   $request_uri             - request URI
-//   $request_url             - request URL
-//   $request_proto           - request protocol
-//   $response_body_size      - response body size in bytes
-//   $response_status         - response status code
-//   $response_time_ms        - response time in S.sss format
-//   $response_time_us        - response time in S.ssssss format
-//   $response_time_ns        - response time in S.sssssssss format
-//   $time_rfc3339            - log timestamp in YYYY-MM-DDTHH:MM:SSZ format
-//   $time_rfc3339_ms         - log timestamp in YYYY-MM-DDTHH:MM:SS.sssZ format
-//   $time_rfc3339_us         - log timestamp in YYYY-MM-DDTHH:MM:SS.ssssssZ format
-//   $time_rfc3339_ns         - log timestamp in YYYY-MM-DDTHH:MM:SS.sssssssssZ format
-//   $time_unix_ms            - log timestamp in unix epoch ms
-//   $time_unix_us            - log timestamp in unix epoch us
-//   $time_unix_ns            - log timestamp in unix epoch ns
-//   $time_common             - log timestamp in DD/MMM/YYYY:HH:MM:SS -ZZZZ
-//   $upstream_addr           - host:port of upstream server
-//   $upstream_host           - host of upstream server
-//   $upstream_port           - port of upstream server
-//   $upstream_request_scheme - upstream request scheme
-//   $upstream_request_uri    - upstream request URI
-//   $upstream_request_url    - upstream request URL
-//
+//	$header.<name>           - request http header (name: [a-zA-Z0-9-]+)
+//	$remote_addr             - host:port of remote client
+//	$remote_host             - host of remote client
+//	$remote_port             - port of remote client
+//	$request                 - request <method> <uri> <proto>
+//	$request_args            - request query parameters
+//	$request_host            - request host header (aka server name)
+//	$request_method          - request method
+//	$request_scheme          - request scheme
+//	$request_uri             - request URI
+//	$request_url             - request URL
+//	$request_proto           - request protocol
+//	$response_body_size      - response body size in bytes
+//	$response_status         - response status code
+//	$response_time_ms        - response time in S.sss format
+//	$response_time_us        - response time in S.ssssss format
+//	$response_time_ns        - response time in S.sssssssss format
+//	$time_rfc3339            - log timestamp in YYYY-MM-DDTHH:MM:SSZ format
+//	$time_rfc3339_ms         - log timestamp in YYYY-MM-DDTHH:MM:SS.sssZ format
+//	$time_rfc3339_us         - log timestamp in YYYY-MM-DDTHH:MM:SS.ssssssZ format
+//	$time_rfc3339_ns         - log timestamp in YYYY-MM-DDTHH:MM:SS.sssssssssZ format
+//	$time_unix_ms            - log timestamp in unix epoch ms
+//	$time_unix_us            - log timestamp in unix epoch us
+//	$time_unix_ns            - log timestamp in unix epoch ns
+//	$time_common             - log timestamp in DD/MMM/YYYY:HH:MM:SS -ZZZZ
+//	$upstream_addr           - host:port of upstream server
+//	$upstream_host           - host of upstream server
+//	$upstream_port           - port of upstream server
+//	$upstream_request_scheme - upstream request scheme
+//	$upstream_request_uri    - upstream request URI
+//	$upstream_request_url    - upstream request URL
 package logger
 
 import (
diff --git a/proxy/grpc_handler.go b/proxy/grpc_handler.go
index 1c4be8b09..ec0e2b20f 100644
--- a/proxy/grpc_handler.go
+++ b/proxy/grpc_handler.go
@@ -165,13 +165,13 @@ func (g GrpcProxyInterceptor) lookup(ctx context.Context, fullMethodName string)
 	return route.GetTable().Lookup(req, req.Header.Get("trace"), pick, match, g.GlobCache, g.Config.GlobMatchingDisabled), nil
 }
 
-//grpc client can specify a destination host in metadata by key 'dsthost', e.g. dsthost=betatest
-//the backend service(s) tags should be urlprefix-betatest/grpcpackage.servicename proto=grpc
-//the 'betatest' will be parsed as 'host' and '/grpcpackage.servicename' is the 'path',
-//a route record will be setup in route Table, t['betatest']
-//the dstHost is extracted from context's metadata of grpc client, that will trigger t[dstHost] is used.
-//if t[dstHost] not exists, fallback to t[""] is used
-//dstHost will be "" as before if not specified by grpc client side.
+// grpc client can specify a destination host in metadata by key 'dsthost', e.g. dsthost=betatest
+// the backend service(s) tags should be urlprefix-betatest/grpcpackage.servicename proto=grpc
+// the 'betatest' will be parsed as 'host' and '/grpcpackage.servicename' is the 'path',
+// a route record will be setup in route Table, t['betatest']
+// the dstHost is extracted from context's metadata of grpc client, that will trigger t[dstHost] is used.
+// if t[dstHost] not exists, fallback to t[""] is used
+// dstHost will be "" as before if not specified by grpc client side.
 func (g GrpcProxyInterceptor) getDestinationHostFromMetadata(md metadata.MD) (dstHost string) {
 	dstHost = ""
 	hosts := md["dsthost"]
@@ -299,15 +299,21 @@ func (p *grpcConnectionPool) cleanup() {
 		p.lock.Lock()
 		table := route.GetTable()
 		for tKey, cs := range p.connections {
-			if cs.GetState() == connectivity.Shutdown {
+			state := cs.GetState()
+			if state == connectivity.Shutdown {
 				delete(p.connections, tKey)
 				continue
 			}
 
 			if !hasTarget(tKey, table) {
 				log.Println("[DEBUG] grpc: cleaning up connection to", tKey)
-				cs.Close()
-				delete(p.connections, tKey)
+				go func(cs *grpc.ClientConn, state connectivity.State) {
+					ctx, cancel := context.WithTimeout(context.Background(), p.cfg.Proxy.GRPCGShutdownTimeout)
+					defer cancel()
+					// wait for state to change, or timeout, before closing, in case it's still handling traffic.
+					cs.WaitForStateChange(ctx, state)
+					cs.Close()
+				}(cs, state)
 			}
 		}
 		p.lock.Unlock()
diff --git a/proxy/http_headers.go b/proxy/http_headers.go
index db5aa05b5..7f610c99c 100644
--- a/proxy/http_headers.go
+++ b/proxy/http_headers.go
@@ -33,7 +33,6 @@ func addResponseHeaders(w http.ResponseWriter, r *http.Request, cfg config.Proxy
 // * add X-Real-Ip, if not present
 // * ClientIPHeader != "": Set header with that name to <remote ip>
 // * TLS connection: Set header with name from `cfg.TLSHeader` to `cfg.TLSHeaderValue`
-//
 func addHeaders(r *http.Request, cfg config.Proxy, stripPath string) error {
 	remoteIP, _, err := net.SplitHostPort(r.RemoteAddr)
 	if err != nil {
diff --git a/proxy/http_integration_test.go b/proxy/http_integration_test.go
index fa081bd3c..45c88e42e 100644
--- a/proxy/http_integration_test.go
+++ b/proxy/http_integration_test.go
@@ -33,7 +33,7 @@ const (
 	globDisabled = true
 )
 
-//Global GlobCache for Testing
+// Global GlobCache for Testing
 var globCache = route.NewGlobCache(1000)
 
 func TestProxyProducesCorrectXForwardedSomethingHeader(t *testing.T) {
diff --git a/registry/consul/register.go b/registry/consul/register.go
index 89fd65778..7596431ae 100644
--- a/registry/consul/register.go
+++ b/registry/consul/register.go
@@ -26,9 +26,8 @@ const (
 // consul. To wait for completion the caller should read the next value from
 // the dereg channel.
 //
-//    dereg <- true // trigger deregistration
-//    <-dereg       // wait for completion
-//
+//	dereg <- true // trigger deregistration
+//	<-dereg       // wait for completion
 func register(c *api.Client, service *api.AgentServiceRegistration) chan bool {
 	registered := func(serviceID string) bool {
 		if serviceID == "" {
diff --git a/route/picker.go b/route/picker.go
index 7828d7ad1..888b46270 100644
--- a/route/picker.go
+++ b/route/picker.go
@@ -41,5 +41,3 @@ var randIntn = func(n int) int {
 	}
 	return rand.Intn(n)
 }
-
-
diff --git a/route/picker_test.go b/route/picker_test.go
index 42ebd3d01..fe4434797 100644
--- a/route/picker_test.go
+++ b/route/picker_test.go
@@ -65,7 +65,7 @@ var oldRandInt = func(n int) int {
 	if n == 0 {
 		return 0
 	}
-	return int(time.Now().UnixNano()/int64(time.Microsecond) % int64(n))
+	return int(time.Now().UnixNano() / int64(time.Microsecond) % int64(n))
 }
 
 var result int // prevent compiler optimization
diff --git a/route/table.go b/route/table.go
index 239dd2ea8..1e6c0369f 100644
--- a/route/table.go
+++ b/route/table.go
@@ -72,7 +72,7 @@ func SetTable(t Table) {
 type Table map[string]Routes
 
 // hostpath splits a 'host/path' prefix into 'host' and '/path' or it returns a
-// ':port' prefix as ':port' and '' since there is no path component for TCP
+// ':port' prefix as ':port' and ” since there is no path component for TCP
 // connections.
 func hostpath(prefix string) (host string, path string) {
 	if strings.HasPrefix(prefix, ":") {
diff --git a/route/table_test.go b/route/table_test.go
index 934581460..72080b012 100644
--- a/route/table_test.go
+++ b/route/table_test.go
@@ -19,7 +19,7 @@ const (
 	globDisabled = true
 )
 
-//Global GlobCache for Testing
+// Global GlobCache for Testing
 var globCache = NewGlobCache(1000)
 
 func TestTableParse(t *testing.T) {