88 "net"
99 "net/http"
1010 "sync"
11+ "time"
1112
1213 "github.com/go-kit/log"
1314 "github.com/go-kit/log/level"
@@ -90,12 +91,6 @@ func (l *HTTP2Listener) ServeHTTP(w http.ResponseWriter, r *http.Request) {
9091 return
9192 }
9293
93- flusher , ok := w .(http.Flusher )
94- if ! ok {
95- http .Error (w , "streaming not supported" , http .StatusInternalServerError )
96- return
97- }
98-
9994 if r .ProtoMajor != 2 {
10095 http .Error (w , "codec not supported" , http .StatusHTTPVersionNotSupported )
10196 return
@@ -112,7 +107,21 @@ func (l *HTTP2Listener) ServeHTTP(w http.ResponseWriter, r *http.Request) {
112107 http .Error (w , "invalid remote addr" , http .StatusBadRequest )
113108 return
114109 }
115- conn := newHTTP2Conn (l .Addr (), remoteAddr , r .Body , w , flusher , l .codec )
110+
111+ rc := http .NewResponseController (w )
112+ err = rc .SetWriteDeadline (time.Time {})
113+ if err != nil {
114+ http .Error (w , "failed to set write deadline" , http .StatusInternalServerError )
115+ return
116+ }
117+
118+ err = rc .SetReadDeadline (time.Time {})
119+ if err != nil {
120+ http .Error (w , "failed to set read deadline" , http .StatusInternalServerError )
121+ return
122+ }
123+
124+ conn := newHTTP2Conn (l .Addr (), remoteAddr , r .Body , w , rc , l .codec )
116125 defer conn .Close ()
117126
118127 incomingConn := & incomingHTTP2Conn {conn : conn , w : w }
@@ -143,7 +152,10 @@ func (l *HTTP2Listener) Accept(ctx context.Context) (Conn, error) {
143152 return nil , net .ErrClosed
144153 case incomingConn := <- l .connCh :
145154 incomingConn .w .WriteHeader (http .StatusOK )
146- incomingConn .conn .flusher .Flush ()
155+ err := incomingConn .conn .responseController .Flush ()
156+ if err != nil {
157+ return nil , fmt .Errorf ("flush response: %w" , err )
158+ }
147159 return incomingConn .conn , nil
148160 }
149161}
@@ -166,11 +178,11 @@ type http2Conn struct {
166178 localAddr net.Addr
167179 remoteAddr net.Addr
168180
169- codec * protobufCodec
170- reader io.ReadCloser
171- writer io.Writer
172- flusher http.Flusher
173- cleanup func () // Optional cleanup function
181+ codec * protobufCodec
182+ reader io.ReadCloser
183+ writer io.Writer
184+ responseController * http.ResponseController
185+ cleanup func () // Optional cleanup function
174186
175187 writeMu sync.Mutex
176188 closeOnce sync.Once
@@ -192,18 +204,18 @@ func newHTTP2Conn(
192204 remoteAddr net.Addr ,
193205 reader io.ReadCloser ,
194206 writer io.Writer ,
195- flusher http.Flusher ,
207+ responseController * http.ResponseController ,
196208 codec * protobufCodec ,
197209) * http2Conn {
198210 c := & http2Conn {
199- localAddr : localAddr ,
200- remoteAddr : remoteAddr ,
201- codec : codec ,
202- reader : reader ,
203- writer : writer ,
204- flusher : flusher ,
205- closed : make (chan struct {}),
206- incomingCh : make (chan incomingFrame ),
211+ localAddr : localAddr ,
212+ remoteAddr : remoteAddr ,
213+ codec : codec ,
214+ reader : reader ,
215+ writer : writer ,
216+ responseController : responseController ,
217+ closed : make (chan struct {}),
218+ incomingCh : make (chan incomingFrame ),
207219 }
208220 return c
209221}
@@ -240,8 +252,11 @@ func (c *http2Conn) Send(ctx context.Context, frame Frame) error {
240252 }
241253
242254 // Flush after each frame to ensure immediate delivery
243- if c .flusher != nil {
244- c .flusher .Flush ()
255+ if c .responseController != nil {
256+ err := c .responseController .Flush ()
257+ if err != nil {
258+ return fmt .Errorf ("flush response: %w" , err )
259+ }
245260 }
246261
247262 return nil
@@ -341,7 +356,7 @@ func (d *HTTP2Dialer) Dial(ctx context.Context, from, to net.Addr) (Conn, error)
341356 to ,
342357 resp .Body ,
343358 pw ,
344- nil , // client doesn't need flusher , it's handled by the pipe writer
359+ nil , // client doesn't need responseController , it's handled by the pipe writer
345360 d .codec ,
346361 )
347362
0 commit comments