diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..5b48abc Binary files /dev/null and b/.DS_Store differ diff --git a/proxy/README.md b/proxy/README.md deleted file mode 120000 index 71bfc07..0000000 --- a/proxy/README.md +++ /dev/null @@ -1 +0,0 @@ -DOC.md \ No newline at end of file diff --git a/proxy/README.md b/proxy/README.md new file mode 100644 index 0000000..85c411a --- /dev/null +++ b/proxy/README.md @@ -0,0 +1,83 @@ +# proxy +-- + import "github.com/mwitkow/grpc-proxy/proxy" + +Package proxy provides a reverse proxy handler for gRPC. + +The implementation allows a `grpc.Server` to pass a received ServerStream to a +ClientStream without understanding the semantics of the messages exchanged. It +basically provides a transparent reverse-proxy. + +This package is intentionally generic, exposing a `StreamDirector` function that +allows users of this package to implement whatever logic of backend-picking, +dialing and service verification to perform. + +See examples on documented functions. + +## Usage + +#### func Codec + +```go +func Codec() grpc.Codec +``` +Codec returns a proxying grpc.Codec with the default protobuf codec as parent. + +See CodecWithParent. + +#### func CodecWithParent + +```go +func CodecWithParent(fallback grpc.Codec) grpc.Codec +``` +CodecWithParent returns a proxying grpc.Codec with a user provided codec as +parent. + +This codec is *crucial* to the functioning of the proxy. It allows the proxy +server to be oblivious to the schema of the forwarded messages. It basically +treats a gRPC message frame as raw bytes. However, if the server handler, or the +client caller are not proxy-internal functions it will fall back to trying to +decode the message using a fallback codec. + +#### func RegisterService + +```go +func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string) +``` +RegisterService sets up a proxy handler for a particular gRPC service and +method. The behaviour is the same as if you were registering a handler method, +e.g. from a codegenerated pb.go file. + +This can *only* be used if the `server` also uses grpcproxy.CodecForServer() +ServerOption. + +#### func TransparentHandler + +```go +func TransparentHandler(director StreamDirector) grpc.StreamHandler +``` +TransparentHandler returns a handler that attempts to proxy all requests that +are not registered in the server. The indented use here is as a transparent +proxy, where the server doesn't know about the services implemented by the +backends. It should be used as a `grpc.UnknownServiceHandler`. + +This can *only* be used if the `server` also uses grpcproxy.CodecForServer() +ServerOption. + +#### type StreamDirector + +```go +type StreamDirector func(ctx context.Context, fullMethodName string) (*grpc.ClientConn, error) +``` + +StreamDirector returns a gRPC ClientConn to be used to forward the call to. + +The presence of the `Context` allows for rich filtering, e.g. based on Metadata +(headers). If no handling is meant to be done, a `codes.NotImplemented` gRPC +error should be returned. + +It is worth noting that the StreamDirector will be fired *after* all server-side +stream interceptors are invoked. So decisions around authorization, monitoring +etc. are better to be handled there. + +See the rather rich example. diff --git a/proxy/handler.go b/proxy/handler.go index 83920f1..f43b05a 100644 --- a/proxy/handler.go +++ b/proxy/handler.go @@ -9,7 +9,8 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/transport" + // "google.golang.org/grpc/transport" + // "google.golang.org/grpc/transport" ) var ( @@ -17,6 +18,7 @@ var ( ServerStreams: true, ClientStreams: true, } + HandleEndCallback func(context.Context, *grpc.ClientConn) ) // RegisterService sets up a proxy handler for a particular gRPC service and method. @@ -60,10 +62,10 @@ type handler struct { // forwarding it to a ClientStream established against the relevant ClientConn. func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error { // little bit of gRPC internals never hurt anyone - lowLevelServerStream, ok := transport.StreamFromContext(serverStream.Context()) - if !ok { - return grpc.Errorf(codes.Internal, "lowLevelServerStream not exists in context") - } + lowLevelServerStream := grpc.ServerTransportStreamFromContext(serverStream.Context()) + // if !ok { + // return grpc.Errorf(codes.Internal, "lowLevelServerStream not exists in context") + // } fullMethodName := lowLevelServerStream.Method() // We require that the director's returned context inherits from the serverStream.Context(). outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName) @@ -71,6 +73,11 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error if err != nil { return err } + if HandleEndCallback == nil { + defer backendConn.Close() + } else { + HandleEndCallback(clientCtx, backendConn) + } // TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For. clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName) if err != nil {