From 4a63adf054f1ce9b2d3ce379004c3306ebb9e615 Mon Sep 17 00:00:00 2001 From: Simon Menke Date: Wed, 4 May 2016 20:12:27 +0200 Subject: [PATCH] Fixed deps --- .../golang/protobuf/proto/pointer_reflect.go | 2 +- .../golang/protobuf/proto/pointer_unsafe.go | 2 +- .../golang/protobuf/proto/properties.go | 8 +- .../golang/protobuf/proto/text_parser.go | 3 +- vendor/golang.org/x/net/context/context.go | 2 +- vendor/google.golang.org/grpc/backoff.go | 18 +-- vendor/google.golang.org/grpc/clientconn.go | 14 +- vendor/google.golang.org/grpc/interceptor.go | 74 ---------- vendor/google.golang.org/grpc/rpc_util.go | 4 +- vendor/google.golang.org/grpc/server.go | 42 +----- vendor/google.golang.org/grpc/stream.go | 8 +- .../grpc/transport/control.go | 86 +++++++++-- .../grpc/transport/http2_client.go | 138 ++++++++---------- .../grpc/transport/http2_server.go | 82 ++++------- vendor/vendor.json | 52 +++---- 15 files changed, 209 insertions(+), 326 deletions(-) delete mode 100644 vendor/google.golang.org/grpc/interceptor.go diff --git a/vendor/github.com/golang/protobuf/proto/pointer_reflect.go b/vendor/github.com/golang/protobuf/proto/pointer_reflect.go index 9899141..749919d 100644 --- a/vendor/github.com/golang/protobuf/proto/pointer_reflect.go +++ b/vendor/github.com/golang/protobuf/proto/pointer_reflect.go @@ -29,7 +29,7 @@ // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -// +build appengine js +// +build appengine // This file contains an implementation of proto field accesses using package reflect. // It is slower than the code in pointer_unsafe.go but it avoids package unsafe and can diff --git a/vendor/github.com/golang/protobuf/proto/pointer_unsafe.go b/vendor/github.com/golang/protobuf/proto/pointer_unsafe.go index ceece77..e9be0fe 100644 --- a/vendor/github.com/golang/protobuf/proto/pointer_unsafe.go +++ b/vendor/github.com/golang/protobuf/proto/pointer_unsafe.go @@ -29,7 +29,7 @@ // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -// +build !appengine,!js +// +build !appengine // This file contains the implementation of the proto field accesses using package unsafe. diff --git a/vendor/github.com/golang/protobuf/proto/properties.go b/vendor/github.com/golang/protobuf/proto/properties.go index 880eb22..4fe2ec2 100644 --- a/vendor/github.com/golang/protobuf/proto/properties.go +++ b/vendor/github.com/golang/protobuf/proto/properties.go @@ -701,11 +701,7 @@ func getPropertiesLocked(t reflect.Type) *StructProperties { if f.Name == "XXX_unrecognized" { // special case prop.unrecField = toField(&f) } - oneof := f.Tag.Get("protobuf_oneof") // special case - if oneof != "" { - // Oneof fields don't use the traditional protobuf tag. - p.OrigName = oneof - } + oneof := f.Tag.Get("protobuf_oneof") != "" // special case prop.Prop[i] = p prop.order[i] = i if debug { @@ -715,7 +711,7 @@ func getPropertiesLocked(t reflect.Type) *StructProperties { } print("\n") } - if p.enc == nil && !strings.HasPrefix(f.Name, "XXX_") && oneof == "" { + if p.enc == nil && !strings.HasPrefix(f.Name, "XXX_") && !oneof { fmt.Fprintln(os.Stderr, "proto: no encoder for", f.Name, f.Type.String(), "[GetProperties]") } } diff --git a/vendor/github.com/golang/protobuf/proto/text_parser.go b/vendor/github.com/golang/protobuf/proto/text_parser.go index b5fba5b..b5e1c8e 100644 --- a/vendor/github.com/golang/protobuf/proto/text_parser.go +++ b/vendor/github.com/golang/protobuf/proto/text_parser.go @@ -663,8 +663,7 @@ func (p *textParser) readStruct(sv reflect.Value, terminator string) error { return err } reqFieldErr = err - } - if props.Required { + } else if props.Required { reqCount-- } diff --git a/vendor/golang.org/x/net/context/context.go b/vendor/golang.org/x/net/context/context.go index 134654c..7350678 100644 --- a/vendor/golang.org/x/net/context/context.go +++ b/vendor/golang.org/x/net/context/context.go @@ -61,7 +61,7 @@ type Context interface { // // // Stream generates values with DoSomething and sends them to out // // until DoSomething returns an error or ctx.Done is closed. - // func Stream(ctx context.Context, out chan<- Value) error { + // func Stream(ctx context.Context, out <-chan Value) error { // for { // v, err := DoSomething(ctx) // if err != nil { diff --git a/vendor/google.golang.org/grpc/backoff.go b/vendor/google.golang.org/grpc/backoff.go index dc4858e..d0113ec 100644 --- a/vendor/google.golang.org/grpc/backoff.go +++ b/vendor/google.golang.org/grpc/backoff.go @@ -8,7 +8,7 @@ import ( // DefaultBackoffConfig uses values specified for backoff in // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md. var ( - DefaultBackoffConfig = BackoffConfig{ + DefaultBackoffConfig = &BackoffConfig{ MaxDelay: 120 * time.Second, baseDelay: 1.0 * time.Second, factor: 1.6, @@ -33,10 +33,7 @@ type BackoffConfig struct { // MaxDelay is the upper bound of backoff delay. MaxDelay time.Duration - // TODO(stevvooe): The following fields are not exported, as allowing - // changes would violate the current GRPC specification for backoff. If - // GRPC decides to allow more interesting backoff strategies, these fields - // may be opened up in the future. + // TODO(stevvooe): The following fields are not exported, as allowing changes // baseDelay is the amount of time to wait before retrying after the first // failure. @@ -49,16 +46,7 @@ type BackoffConfig struct { jitter float64 } -func setDefaults(bc *BackoffConfig) { - md := bc.MaxDelay - *bc = DefaultBackoffConfig - - if md > 0 { - bc.MaxDelay = md - } -} - -func (bc BackoffConfig) backoff(retries int) (t time.Duration) { +func (bc *BackoffConfig) backoff(retries int) (t time.Duration) { if retries == 0 { return bc.baseDelay } diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go index 6de86e9..1562c0f 100644 --- a/vendor/google.golang.org/grpc/clientconn.go +++ b/vendor/google.golang.org/grpc/clientconn.go @@ -115,21 +115,9 @@ func WithPicker(p Picker) DialOption { } } -// WithBackoffMaxDelay configures the dialer to use the provided maximum delay -// when backing off after failed connection attempts. -func WithBackoffMaxDelay(md time.Duration) DialOption { - return WithBackoffConfig(BackoffConfig{MaxDelay: md}) -} - // WithBackoffConfig configures the dialer to use the provided backoff // parameters after connection failures. -// -// Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up -// for use. -func WithBackoffConfig(b BackoffConfig) DialOption { - // Set defaults to ensure that provided BackoffConfig is valid and - // unexported fields get default values. - setDefaults(&b) +func WithBackoffConfig(b *BackoffConfig) DialOption { return withBackoff(b) } diff --git a/vendor/google.golang.org/grpc/interceptor.go b/vendor/google.golang.org/grpc/interceptor.go deleted file mode 100644 index 588f59e..0000000 --- a/vendor/google.golang.org/grpc/interceptor.go +++ /dev/null @@ -1,74 +0,0 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -package grpc - -import ( - "golang.org/x/net/context" -) - -// UnaryServerInfo consists of various information about a unary RPC on -// server side. All per-rpc information may be mutated by the interceptor. -type UnaryServerInfo struct { - // Server is the service implementation the user provides. This is read-only. - Server interface{} - // FullMethod is the full RPC method string, i.e., /package.service/method. - FullMethod string -} - -// UnaryHandler defines the handler invoked by UnaryServerInterceptor to complete the normal -// execution of a unary RPC. -type UnaryHandler func(ctx context.Context, req interface{}) (interface{}, error) - -// UnaryServerInterceptor provides a hook to intercept the execution of a unary RPC on the server. info -// contains all the information of this RPC the interceptor can operate on. And handler is the wrapper -// of the service method implementation. It is the responsibility of the interceptor to invoke handler -// to complete the RPC. -type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error) - -// StreamServerInfo consists of various information about a streaming RPC on -// server side. All per-rpc information may be mutated by the interceptor. -type StreamServerInfo struct { - // FullMethod is the full RPC method string, i.e., /package.service/method. - FullMethod string - // IsClientStream indicates whether the RPC is a client streaming RPC. - IsClientStream bool - // IsServerStream indicates whether the RPC is a server streaming RPC. - IsServerStream bool -} - -// StreamServerInterceptor provides a hook to intercept the execution of a streaming RPC on the server. -// info contains all the information of this RPC the interceptor can operate on. And handler is the -// service method implementation. It is the responsibility of the interceptor to invoke handler to -// complete the RPC. -type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go index 3192f01..8ad335e 100644 --- a/vendor/google.golang.org/grpc/rpc_util.go +++ b/vendor/google.golang.org/grpc/rpc_util.go @@ -409,10 +409,10 @@ func convertCode(err error) codes.Code { return codes.Unknown } -// SupportPackageIsVersion2 is referenced from generated protocol buffer files +// SupportPackageIsVersion1 is referenced from generated protocol buffer files // to assert that that code is compatible with this version of the grpc package. // // This constant may be renamed in the future if a change in the generated code // requires a synchronised update of grpc-go and protoc-gen-go. This constant // should not be referenced from any other code. -const SupportPackageIsVersion2 = true +const SupportPackageIsVersion1 = true diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go index d3a8073..bdf68a0 100644 --- a/vendor/google.golang.org/grpc/server.go +++ b/vendor/google.golang.org/grpc/server.go @@ -57,7 +57,7 @@ import ( "google.golang.org/grpc/transport" ) -type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error) +type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) // MethodDesc represents an RPC service's method specification. type MethodDesc struct { @@ -99,8 +99,6 @@ type options struct { codec Codec cp Compressor dc Decompressor - unaryInt UnaryServerInterceptor - streamInt StreamServerInterceptor maxConcurrentStreams uint32 useHandlerImpl bool // use http.Handler-based server } @@ -142,29 +140,6 @@ func Creds(c credentials.Credentials) ServerOption { } } -// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the -// server. Only one unary interceptor can be installed. The construction of multiple -// interceptors (e.g., chaining) can be implemented at the caller. -func UnaryInterceptor(i UnaryServerInterceptor) ServerOption { - return func(o *options) { - if o.unaryInt != nil { - panic("The unary server interceptor has been set.") - } - o.unaryInt = i - } -} - -// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the -// server. Only one stream interceptor can be installed. -func StreamInterceptor(i StreamServerInterceptor) ServerOption { - return func(o *options) { - if o.streamInt != nil { - panic("The stream server interceptor has been set.") - } - o.streamInt = i - } -} - // NewServer creates a gRPC server which has no service registered and has not // started to accept requests yet. func NewServer(opt ...ServerOption) *Server { @@ -519,7 +494,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } return nil } - reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt) + reply, appErr := md.Handler(srv.server, stream.Context(), df) if appErr != nil { if err, ok := appErr.(rpcError); ok { statusCode = err.code @@ -597,18 +572,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp ss.mu.Unlock() }() } - var appErr error - if s.opts.streamInt == nil { - appErr = sd.Handler(srv.server, ss) - } else { - info := &StreamServerInfo{ - FullMethod: stream.Method(), - IsClientStream: sd.ClientStreams, - IsServerStream: sd.ServerStreams, - } - appErr = s.opts.streamInt(srv.server, ss, info, sd.Handler) - } - if appErr != nil { + if appErr := sd.Handler(srv.server, ss); appErr != nil { if err, ok := appErr.(rpcError); ok { ss.statusCode = err.code ss.statusDesc = err.desc diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go index 565fc3c..b832078 100644 --- a/vendor/google.golang.org/grpc/stream.go +++ b/vendor/google.golang.org/grpc/stream.go @@ -47,14 +47,12 @@ import ( "google.golang.org/grpc/transport" ) -// StreamHandler defines the handler called by gRPC server to complete the -// execution of a streaming RPC. -type StreamHandler func(srv interface{}, stream ServerStream) error +type streamHandler func(srv interface{}, stream ServerStream) error // StreamDesc represents a streaming RPC service's method specification. type StreamDesc struct { StreamName string - Handler StreamHandler + Handler streamHandler // At least one of these is true. ServerStreams bool @@ -81,7 +79,7 @@ type Stream interface { // ClientStream defines the interface a client stream has to satify. type ClientStream interface { - // Header returns the header metadata received from the server if there + // Header returns the header metedata received from the server if there // is any. It blocks if the metadata is not ready to read. Header() (metadata.MD, error) // Trailer returns the trailer metadata from the server. It must be called diff --git a/vendor/google.golang.org/grpc/transport/control.go b/vendor/google.golang.org/grpc/transport/control.go index 7e9bdf3..b2e602e 100644 --- a/vendor/google.golang.org/grpc/transport/control.go +++ b/vendor/google.golang.org/grpc/transport/control.go @@ -162,6 +162,10 @@ func (qb *quotaPool) acquire() <-chan int { type inFlow struct { // The inbound flow control limit for pending data. limit uint32 + // conn points to the shared connection-level inFlow that is shared + // by all streams on that conn. It is nil for the inFlow on the conn + // directly. + conn *inFlow mu sync.Mutex // pendingData is the overall data which have been received but not been @@ -172,39 +176,97 @@ type inFlow struct { pendingUpdate uint32 } -// onData is invoked when some data frame is received. It updates pendingData. +// onData is invoked when some data frame is received. It increments not only its +// own pendingData but also that of the associated connection-level flow. func (f *inFlow) onData(n uint32) error { + if n == 0 { + return nil + } f.mu.Lock() defer f.mu.Unlock() - f.pendingData += n - if f.pendingData+f.pendingUpdate > f.limit { - return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit) + if f.pendingData+f.pendingUpdate+n > f.limit { + return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate+n, f.limit) } + if f.conn != nil { + if err := f.conn.onData(n); err != nil { + return ConnectionErrorf("%v", err) + } + } + f.pendingData += n return nil } -// onRead is invoked when the application reads the data. It returns the window size -// to be sent to the peer. -func (f *inFlow) onRead(n uint32) uint32 { +// adjustConnPendingUpdate increments the connection level pending updates by n. +// This is called to make the proper connection level window updates when +// receiving data frame targeting the canceled RPCs. +func (f *inFlow) adjustConnPendingUpdate(n uint32) (uint32, error) { + if n == 0 || f.conn != nil { + return 0, nil + } f.mu.Lock() defer f.mu.Unlock() - if f.pendingData == 0 { + if f.pendingData+f.pendingUpdate+n > f.limit { + return 0, ConnectionErrorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate+n, f.limit) + } + f.pendingUpdate += n + if f.pendingUpdate >= f.limit/4 { + ret := f.pendingUpdate + f.pendingUpdate = 0 + return ret, nil + } + return 0, nil + +} + +// connOnRead updates the connection level states when the application consumes data. +func (f *inFlow) connOnRead(n uint32) uint32 { + if n == 0 || f.conn != nil { return 0 } + f.mu.Lock() + defer f.mu.Unlock() f.pendingData -= n f.pendingUpdate += n if f.pendingUpdate >= f.limit/4 { - wu := f.pendingUpdate + ret := f.pendingUpdate f.pendingUpdate = 0 - return wu + return ret } return 0 } -func (f *inFlow) resetPendingData() uint32 { +// onRead is invoked when the application reads the data. It returns the window updates +// for both stream and connection level. +func (f *inFlow) onRead(n uint32) (swu, cwu uint32) { + if n == 0 { + return + } + f.mu.Lock() + defer f.mu.Unlock() + if f.pendingData == 0 { + // pendingData has been adjusted by restoreConn. + return + } + f.pendingData -= n + f.pendingUpdate += n + if f.pendingUpdate >= f.limit/4 { + swu = f.pendingUpdate + f.pendingUpdate = 0 + } + cwu = f.conn.connOnRead(n) + return +} + +// restoreConn is invoked when a stream is terminated. It removes its stake in +// the connection-level flow and resets its own state. +func (f *inFlow) restoreConn() uint32 { + if f.conn == nil { + return 0 + } f.mu.Lock() defer f.mu.Unlock() n := f.pendingData f.pendingData = 0 - return n + f.pendingUpdate = 0 + return f.conn.connOnRead(n) } diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go index 8e916b0..77c0544 100644 --- a/vendor/google.golang.org/grpc/transport/http2_client.go +++ b/vendor/google.golang.org/grpc/transport/http2_client.go @@ -140,6 +140,29 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e conn.Close() } }() + // Send connection preface to server. + n, err := conn.Write(clientPreface) + if err != nil { + return nil, ConnectionErrorf("transport: %v", err) + } + if n != len(clientPreface) { + return nil, ConnectionErrorf("transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) + } + framer := newFramer(conn) + if initialWindowSize != defaultWindowSize { + err = framer.writeSettings(true, http2.Setting{http2.SettingInitialWindowSize, uint32(initialWindowSize)}) + } else { + err = framer.writeSettings(true) + } + if err != nil { + return nil, ConnectionErrorf("transport: %v", err) + } + // Adjust the connection flow control window if needed. + if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 { + if err := framer.writeWindowUpdate(true, 0, delta); err != nil { + return nil, ConnectionErrorf("transport: %v", err) + } + } ua := primaryUA if opts.UserAgent != "" { ua = opts.UserAgent + " " + ua @@ -155,7 +178,7 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e writableChan: make(chan int, 1), shutdownChan: make(chan struct{}), errorChan: make(chan struct{}), - framer: newFramer(conn), + framer: framer, hBuf: &buf, hEnc: hpack.NewEncoder(&buf), controlBuf: newRecvBuffer(), @@ -168,49 +191,28 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e maxStreams: math.MaxInt32, streamSendQuota: defaultWindowSize, } - // Start the reader goroutine for incoming message. Each transport has - // a dedicated goroutine which reads HTTP2 frame from network. Then it - // dispatches the frame to the corresponding stream entity. - go t.reader() - // Send connection preface to server. - n, err := t.conn.Write(clientPreface) - if err != nil { - t.Close() - return nil, ConnectionErrorf("transport: %v", err) - } - if n != len(clientPreface) { - t.Close() - return nil, ConnectionErrorf("transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) - } - if initialWindowSize != defaultWindowSize { - err = t.framer.writeSettings(true, http2.Setting{http2.SettingInitialWindowSize, uint32(initialWindowSize)}) - } else { - err = t.framer.writeSettings(true) - } - if err != nil { - t.Close() - return nil, ConnectionErrorf("transport: %v", err) - } - // Adjust the connection flow control window if needed. - if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 { - if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil { - t.Close() - return nil, ConnectionErrorf("transport: %v", err) - } - } go t.controller() t.writableChan <- 0 + // Start the reader goroutine for incoming message. The threading model + // on receiving is that each transport has a dedicated goroutine which + // reads HTTP2 frame from network. Then it dispatches the frame to the + // corresponding stream entity. + go t.reader() return t, nil } func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { + fc := &inFlow{ + limit: initialWindowSize, + conn: t.fc, + } // TODO(zhaoq): Handle uint32 overflow of Stream.id. s := &Stream{ id: t.nextID, method: callHdr.Method, sendCompress: callHdr.SendCompress, buf: newRecvBuffer(), - fc: &inFlow{limit: initialWindowSize}, + fc: fc, sendQuotaPool: newQuotaPool(int(t.streamSendQuota)), headerChan: make(chan struct{}), } @@ -235,10 +237,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea if dl, ok := ctx.Deadline(); ok { timeout = dl.Sub(time.Now()) } - select { - case <-ctx.Done(): - return nil, ContextErr(ctx.Err()) - default: + if err := ctx.Err(); err != nil { + return nil, ContextErr(err) } pr := &peer.Peer{ Addr: t.conn.RemoteAddr(), @@ -404,10 +404,8 @@ func (t *http2Client) CloseStream(s *Stream, err error) { // other goroutines. s.cancel() s.mu.Lock() - if q := s.fc.resetPendingData(); q > 0 { - if n := t.fc.onRead(q); n > 0 { - t.controlBuf.put(&windowUpdate{0, n}) - } + if q := s.fc.restoreConn(); q > 0 { + t.controlBuf.put(&windowUpdate{0, q}) } if s.state == streamDone { s.mu.Unlock() @@ -429,9 +427,6 @@ func (t *http2Client) CloseStream(s *Stream, err error) { // accessed any more. func (t *http2Client) Close() (err error) { t.mu.Lock() - if t.state == reachable { - close(t.errorChan) - } if t.state == closing { t.mu.Unlock() return errors.New("transport: Close() was already called") @@ -510,10 +505,6 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { t.framer.adjustNumWriters(1) // Got some quota. Try to acquire writing privilege on the transport. if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil { - if _, ok := err.(StreamError); ok { - // Return the connection quota back. - t.sendQuotaPool.add(len(p)) - } if t.framer.adjustNumWriters(-1) == 0 { // This writer is the last one in this batch and has the // responsibility to flush the buffered frames. It queues @@ -523,16 +514,6 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { } return err } - select { - case <-s.ctx.Done(): - t.sendQuotaPool.add(len(p)) - if t.framer.adjustNumWriters(-1) == 0 { - t.controlBuf.put(&flushIO{}) - } - t.writableChan <- 0 - return ContextErr(s.ctx.Err()) - default: - } if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 { // Do a force flush iff this is last frame for the entire gRPC message // and the caller is the only writer at this moment. @@ -579,39 +560,41 @@ func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) { // Window updates will deliver to the controller for sending when // the cumulative quota exceeds the corresponding threshold. func (t *http2Client) updateWindow(s *Stream, n uint32) { - if w := t.fc.onRead(n); w > 0 { - t.controlBuf.put(&windowUpdate{0, w}) + swu, cwu := s.fc.onRead(n) + if swu > 0 { + t.controlBuf.put(&windowUpdate{s.id, swu}) } - if w := s.fc.onRead(n); w > 0 { - t.controlBuf.put(&windowUpdate{s.id, w}) + if cwu > 0 { + t.controlBuf.put(&windowUpdate{0, cwu}) } } func (t *http2Client) handleData(f *http2.DataFrame) { - size := len(f.Data()) - if err := t.fc.onData(uint32(size)); err != nil { - t.notifyError(ConnectionErrorf("%v", err)) - return - } // Select the right stream to dispatch. + size := len(f.Data()) s, ok := t.getStream(f) if !ok { - if w := t.fc.onRead(uint32(size)); w > 0 { - t.controlBuf.put(&windowUpdate{0, w}) + cwu, err := t.fc.adjustConnPendingUpdate(uint32(size)) + if err != nil { + t.notifyError(err) + return + } + if cwu > 0 { + t.controlBuf.put(&windowUpdate{0, cwu}) } return } if size > 0 { - s.mu.Lock() - if s.state == streamDone { - s.mu.Unlock() - // The stream has been closed. Release the corresponding quota. - if w := t.fc.onRead(uint32(size)); w > 0 { - t.controlBuf.put(&windowUpdate{0, w}) - } - return - } if err := s.fc.onData(uint32(size)); err != nil { + if _, ok := err.(ConnectionError); ok { + t.notifyError(err) + return + } + s.mu.Lock() + if s.state == streamDone { + s.mu.Unlock() + return + } s.state = streamDone s.statusCode = codes.Internal s.statusDesc = err.Error() @@ -620,7 +603,6 @@ func (t *http2Client) handleData(f *http2.DataFrame) { t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl}) return } - s.mu.Unlock() // TODO(bradfitz, zhaoq): A copy is required here because there is no // guarantee f.Data() is consumed before the arrival of next frame. // Can this copy be eliminated? diff --git a/vendor/google.golang.org/grpc/transport/http2_server.go b/vendor/google.golang.org/grpc/transport/http2_server.go index 6f233d9..68f8203 100644 --- a/vendor/google.golang.org/grpc/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/transport/http2_server.go @@ -139,11 +139,15 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI // operateHeader takes action on the decoded headers. func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) { buf := newRecvBuffer() + fc := &inFlow{ + limit: initialWindowSize, + conn: t.fc, + } s := &Stream{ id: frame.Header().StreamID, st: t, buf: buf, - fc: &inFlow{limit: initialWindowSize}, + fc: fc, } var state decodeState @@ -303,46 +307,42 @@ func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) { // Window updates will deliver to the controller for sending when // the cumulative quota exceeds the corresponding threshold. func (t *http2Server) updateWindow(s *Stream, n uint32) { - if w := t.fc.onRead(n); w > 0 { - t.controlBuf.put(&windowUpdate{0, w}) + swu, cwu := s.fc.onRead(n) + if swu > 0 { + t.controlBuf.put(&windowUpdate{s.id, swu}) } - if w := s.fc.onRead(n); w > 0 { - t.controlBuf.put(&windowUpdate{s.id, w}) + if cwu > 0 { + t.controlBuf.put(&windowUpdate{0, cwu}) } } func (t *http2Server) handleData(f *http2.DataFrame) { - size := len(f.Data()) - if err := t.fc.onData(uint32(size)); err != nil { - grpclog.Printf("transport: http2Server %v", err) - t.Close() - return - } // Select the right stream to dispatch. + size := len(f.Data()) s, ok := t.getStream(f) if !ok { - if w := t.fc.onRead(uint32(size)); w > 0 { - t.controlBuf.put(&windowUpdate{0, w}) + cwu, err := t.fc.adjustConnPendingUpdate(uint32(size)) + if err != nil { + grpclog.Printf("transport: http2Server %v", err) + t.Close() + return + } + if cwu > 0 { + t.controlBuf.put(&windowUpdate{0, cwu}) } return } if size > 0 { - s.mu.Lock() - if s.state == streamDone { - s.mu.Unlock() - // The stream has been closed. Release the corresponding quota. - if w := t.fc.onRead(uint32(size)); w > 0 { - t.controlBuf.put(&windowUpdate{0, w}) - } - return - } if err := s.fc.onData(uint32(size)); err != nil { - s.mu.Unlock() + if _, ok := err.(ConnectionError); ok { + grpclog.Printf("transport: http2Server %v", err) + t.Close() + return + } t.closeStream(s) t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl}) return } - s.mu.Unlock() // TODO(bradfitz, zhaoq): A copy is required here because there is no // guarantee f.Data() is consumed before the arrival of next frame. // Can this copy be eliminated? @@ -516,10 +516,6 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error { // TODO(zhaoq): Support multi-writers for a single stream. var writeHeaderFrame bool s.mu.Lock() - if s.state == streamDone { - s.mu.Unlock() - return StreamErrorf(codes.Unknown, "the stream has been done") - } if !s.headerOk { writeHeaderFrame = true s.headerOk = true @@ -587,10 +583,6 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error { // Got some quota. Try to acquire writing privilege on the // transport. if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil { - if _, ok := err.(StreamError); ok { - // Return the connection quota back. - t.sendQuotaPool.add(ps) - } if t.framer.adjustNumWriters(-1) == 0 { // This writer is the last one in this batch and has the // responsibility to flush the buffered frames. It queues @@ -600,16 +592,6 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error { } return err } - select { - case <-s.ctx.Done(): - t.sendQuotaPool.add(ps) - if t.framer.adjustNumWriters(-1) == 0 { - t.controlBuf.put(&flushIO{}) - } - t.writableChan <- 0 - return ContextErr(s.ctx.Err()) - default: - } var forceFlush bool if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && !opts.Last { forceFlush = true @@ -707,22 +689,20 @@ func (t *http2Server) closeStream(s *Stream) { t.mu.Lock() delete(t.activeStreams, s.id) t.mu.Unlock() - // In case stream sending and receiving are invoked in separate - // goroutines (e.g., bi-directional streaming), cancel needs to be - // called to interrupt the potential blocking on other goroutines. - s.cancel() - s.mu.Lock() - if q := s.fc.resetPendingData(); q > 0 { - if w := t.fc.onRead(q); w > 0 { - t.controlBuf.put(&windowUpdate{0, w}) - } + if q := s.fc.restoreConn(); q > 0 { + t.controlBuf.put(&windowUpdate{0, q}) } + s.mu.Lock() if s.state == streamDone { s.mu.Unlock() return } s.state = streamDone s.mu.Unlock() + // In case stream sending and receiving are invoked in separate + // goroutines (e.g., bi-directional streaming), cancel needs to be + // called to interrupt the potential blocking on other goroutines. + s.cancel() } func (t *http2Server) RemoteAddr() net.Addr { diff --git a/vendor/vendor.json b/vendor/vendor.json index 0b1df34..6a434b3 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -31,10 +31,10 @@ "revisionTime": "2016-04-28T23:07:08Z" }, { - "checksumSHA1": "FczzogSoZcKU3h21tCUyHzMsnBY=", + "checksumSHA1": "UQtJP2n/HEBhHaq6a9R2hCHgy+8=", "path": "github.com/golang/protobuf/proto", - "revision": "7cc19b78d562895b13596ddce7aafb59dd789318", - "revisionTime": "2016-04-25T21:53:00Z" + "revision": "8d92cf5fc15a4382f8964b08e1f42a75c0591aa3", + "revisionTime": "2016-03-18T18:57:00Z" }, { "checksumSHA1": "KIX/3RadQkfl4ZxCmOQ01vAGLEI=", @@ -79,10 +79,10 @@ "revisionTime": "2016-05-04T12:50:21Z" }, { - "checksumSHA1": "9jjO5GjLa0XF/nfWihF02RoH4qc=", + "checksumSHA1": "pancewZW3HwGvpDwfH5Imrbadc4=", "path": "golang.org/x/net/context", - "revision": "35ec611a141ee705590b9eb64d673f9e6dfeb1ac", - "revisionTime": "2016-04-28T06:05:50Z" + "revision": "fb93926129b8ec0056f2f458b1f519654814edf0", + "revisionTime": "2016-04-12T22:48:50Z" }, { "checksumSHA1": "TXZ/hsXjV3EsbeCx+s01AS/d/qw=", @@ -109,58 +109,58 @@ "revisionTime": "2016-04-28T06:05:50Z" }, { - "checksumSHA1": "JGZ+ltnbzDw+juFHmbwEpBwGsS8=", + "checksumSHA1": "F3QG/ODWd++7nooYFcbx9FoS4Uo=", "path": "google.golang.org/grpc", - "revision": "7f24e5e90e7e79ee1c68359b05908b668a23ffb2", - "revisionTime": "2016-05-03T17:54:26Z" + "revision": "1a5928317d2d6526b75552438d5593e005f36477", + "revisionTime": "2016-04-20T01:13:47Z" }, { "checksumSHA1": "08icuA15HRkdYCt6H+Cs90RPQsY=", "path": "google.golang.org/grpc/codes", - "revision": "7f24e5e90e7e79ee1c68359b05908b668a23ffb2", - "revisionTime": "2016-05-03T17:54:26Z" + "revision": "1a5928317d2d6526b75552438d5593e005f36477", + "revisionTime": "2016-04-20T01:13:47Z" }, { "checksumSHA1": "a+C+FoyX7Tc4iNFhBndt/y/iPEg=", "path": "google.golang.org/grpc/credentials", - "revision": "7f24e5e90e7e79ee1c68359b05908b668a23ffb2", - "revisionTime": "2016-05-03T17:54:26Z" + "revision": "1a5928317d2d6526b75552438d5593e005f36477", + "revisionTime": "2016-04-20T01:13:47Z" }, { "checksumSHA1": "3Lt5hNAG8qJAYSsNghR5uA1zQns=", "path": "google.golang.org/grpc/grpclog", - "revision": "7f24e5e90e7e79ee1c68359b05908b668a23ffb2", - "revisionTime": "2016-05-03T17:54:26Z" + "revision": "1a5928317d2d6526b75552438d5593e005f36477", + "revisionTime": "2016-04-20T01:13:47Z" }, { "checksumSHA1": "T3Q0p8kzvXFnRkMaK/G8mCv6mc0=", "path": "google.golang.org/grpc/internal", - "revision": "7f24e5e90e7e79ee1c68359b05908b668a23ffb2", - "revisionTime": "2016-05-03T17:54:26Z" + "revision": "1a5928317d2d6526b75552438d5593e005f36477", + "revisionTime": "2016-04-20T01:13:47Z" }, { "checksumSHA1": "IlzBp7dylW8F4D7eINj8xHt/jMk=", "path": "google.golang.org/grpc/metadata", - "revision": "7f24e5e90e7e79ee1c68359b05908b668a23ffb2", - "revisionTime": "2016-05-03T17:54:26Z" + "revision": "1a5928317d2d6526b75552438d5593e005f36477", + "revisionTime": "2016-04-20T01:13:47Z" }, { "checksumSHA1": "fGJlPKvpYAZ6+HRX3qScFnZ9dd4=", "path": "google.golang.org/grpc/naming", - "revision": "7f24e5e90e7e79ee1c68359b05908b668a23ffb2", - "revisionTime": "2016-05-03T17:54:26Z" + "revision": "1a5928317d2d6526b75552438d5593e005f36477", + "revisionTime": "2016-04-20T01:13:47Z" }, { "checksumSHA1": "3RRoLeH6X2//7tVClOVzxW2bY+E=", "path": "google.golang.org/grpc/peer", - "revision": "7f24e5e90e7e79ee1c68359b05908b668a23ffb2", - "revisionTime": "2016-05-03T17:54:26Z" + "revision": "1a5928317d2d6526b75552438d5593e005f36477", + "revisionTime": "2016-04-20T01:13:47Z" }, { - "checksumSHA1": "5jO2rWNnAmzT3oMOQyWPEurpMgc=", + "checksumSHA1": "Gi8N7YQnhBckupcw1IZnL7n/PiI=", "path": "google.golang.org/grpc/transport", - "revision": "7f24e5e90e7e79ee1c68359b05908b668a23ffb2", - "revisionTime": "2016-05-03T17:54:26Z" + "revision": "1a5928317d2d6526b75552438d5593e005f36477", + "revisionTime": "2016-04-20T01:13:47Z" }, { "checksumSHA1": "QPs8F/aqKAAQJr8dLc3CVQ5SxlM=",