diff --git a/Makefile b/Makefile index cd8cf679..ba997bef 100644 --- a/Makefile +++ b/Makefile @@ -76,8 +76,8 @@ release: clean build.linux build/osx/$(BINARY) github-release upload -u grepplabs -r $(BINARY) -t $(TAG) -f build/osx/$(BINARY) -n darwin/amd64/$(BINARY) github-release info -u grepplabs -r $(BINARY) -protoc.auth: - protoc -I plugin/auth/proto/ plugin/auth/proto/auth.proto --go_out=plugins=grpc:plugin/auth/proto/ +protoc.local-auth: + protoc -I plugin/local-auth/proto/ plugin/local-auth/proto/auth.proto --go_out=plugins=grpc:plugin/local-auth/proto/ plugin.auth-user: CGO_ENABLED=0 go build -o build/auth-user $(BUILD_FLAGS) -ldflags "$(LDFLAGS)" cmd/plugin-auth-user/main.go diff --git a/README.md b/README.md index 87a29045..84184aa5 100644 --- a/README.md +++ b/README.md @@ -82,17 +82,17 @@ See: --proxy-listener-cert-file "server-cert.pem" \ --proxy-listener-ca-chain-cert-file "ca.pem" \ --proxy-listener-tls-enable \ - --proxy-listener-auth-enable \ - --proxy-listener-auth-command build/auth-user \ - --proxy-listener-auth-param "--username=my-test-user" \ - --proxy-listener-auth-param "--password=my-test-password" + --auth-local-enable \ + --auth-local-command build/auth-user \ + --auth-local-param "--username=my-test-user" \ + --auth-local-param "--password=my-test-password" make clean build plugin.auth-ldap && build/kafka-proxy server \ - --proxy-listener-auth-enable \ - --proxy-listener-auth-command build/auth-ldap \ - --proxy-listener-auth-param "--url=ldaps://ldap.example.com:636" \ - --proxy-listener-auth-param "--user-dn=cn=users,dc=exemple,dc=com" \ - --proxy-listener-auth-param "--user-attr=uid" \ + --auth-local-enable \ + --auth-local-command build/auth-ldap \ + --auth-local-param "--url=ldaps://ldap.example.com:636" \ + --auth-local-param "--user-dn=cn=users,dc=exemple,dc=com" \ + --auth-local-param "--user-attr=uid" \ --bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400" diff --git a/cmd/kafka-proxy/server.go b/cmd/kafka-proxy/server.go index 094acef5..0dfad476 100644 --- a/cmd/kafka-proxy/server.go +++ b/cmd/kafka-proxy/server.go @@ -19,7 +19,7 @@ import ( "time" "errors" - "github.com/grepplabs/kafka-proxy/plugin/auth/shared" + "github.com/grepplabs/kafka-proxy/plugin/local-auth/shared" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-plugin" ) @@ -75,11 +75,27 @@ func init() { Server.Flags().StringVar(&c.Proxy.TLS.ListenerKeyPassword, "proxy-listener-key-password", "", "Password to decrypt rsa private key") Server.Flags().StringVar(&c.Proxy.TLS.CAChainCertFile, "proxy-listener-ca-chain-cert-file", "", "PEM encoded CA's certificate file") - // authentication plugin - Server.Flags().BoolVar(&c.Proxy.Auth.Enable, "proxy-listener-auth-enable", false, "Enable SASL/PLAIN listener authentication") - Server.Flags().StringVar(&c.Proxy.Auth.Command, "proxy-listener-auth-command", "", "Path to authentication plugin binary") - Server.Flags().StringArrayVar(&c.Proxy.Auth.Parameters, "proxy-listener-auth-param", []string{}, "Authentication plugin parameter") - Server.Flags().StringVar(&c.Proxy.Auth.LogLevel, "proxy-listener-auth-log-level", "trace", "Log level of the auth plugin") + // local authentication plugin + Server.Flags().BoolVar(&c.Auth.Local.Enable, "auth-local-enable", false, "Enable local SASL/PLAIN authentication performed by listener - SASL handshake will not be passed to kafka brokers") + Server.Flags().StringVar(&c.Auth.Local.Command, "auth-local-command", "", "Path to authentication plugin binary") + Server.Flags().StringArrayVar(&c.Auth.Local.Parameters, "auth-local-param", []string{}, "Authentication plugin parameter") + Server.Flags().StringVar(&c.Auth.Local.LogLevel, "auth-local-log-level", "trace", "Log level of the auth plugin") + + Server.Flags().BoolVar(&c.Auth.Gateway.Client.Enable, "auth-gateway-client-enable", false, "Enable gateway client authentication") + Server.Flags().StringVar(&c.Auth.Gateway.Client.Command, "auth-gateway-client-command", "", "Path to authentication plugin binary") + Server.Flags().StringArrayVar(&c.Auth.Gateway.Client.Parameters, "auth-gateway-client-param", []string{}, "Authentication plugin parameter") + Server.Flags().StringVar(&c.Auth.Gateway.Client.LogLevel, "auth-gateway-client-log-level", "trace", "Log level of the auth plugin") + Server.Flags().StringVar(&c.Auth.Gateway.Client.Method, "auth-gateway-client-method", "", "Authentication method") + Server.Flags().Uint64Var(&c.Auth.Gateway.Client.Magic, "auth-gateway-client-magic", 0, "Magic bytes sent in the handshake") + Server.Flags().DurationVar(&c.Auth.Gateway.Client.Timeout, "auth-gateway-client-timeout", 10*time.Second, "Authentication timeout") + + Server.Flags().BoolVar(&c.Auth.Gateway.Server.Enable, "auth-gateway-server-enable", false, "Enable proxy server authentication") + Server.Flags().StringVar(&c.Auth.Gateway.Server.Command, "auth-gateway-server-command", "", "Path to authentication plugin binary") + Server.Flags().StringArrayVar(&c.Auth.Gateway.Server.Parameters, "auth-gateway-server-param", []string{}, "Authentication plugin parameter") + Server.Flags().StringVar(&c.Auth.Gateway.Server.LogLevel, "auth-gateway-server-log-level", "trace", "Log level of the auth plugin") + Server.Flags().StringVar(&c.Auth.Gateway.Server.Method, "auth-gateway-server-method", "", "Authentication method") + Server.Flags().Uint64Var(&c.Auth.Gateway.Server.Magic, "auth-gateway-server-magic", 0, "Magic bytes sent in the handshake") + Server.Flags().DurationVar(&c.Auth.Gateway.Server.Timeout, "auth-gateway-server-timeout", 10*time.Second, "Authentication timeout") // kafka Server.Flags().StringVar(&c.Kafka.ClientID, "kafka-client-id", "kafka-proxy", "An optional identifier to track the source of requests") @@ -127,8 +143,8 @@ func Run(_ *cobra.Command, _ []string) { logrus.Infof("Starting kafka-proxy version %s", config.Version) var passwordAuthenticator shared.PasswordAuthenticator - if c.Proxy.Auth.Enable { - client := NewPluginClient() + if c.Auth.Local.Enable { + client := NewLocalAuthPluginClient() defer client.Kill() rpcClient, err := client.Client() @@ -257,14 +273,14 @@ func SetLogger() { logrus.SetLevel(level) } -func NewPluginClient() *plugin.Client { +func NewLocalAuthPluginClient() *plugin.Client { jsonFormat := false if c.Log.Format == "json" { jsonFormat = true } logger := hclog.New(&hclog.LoggerOptions{ Output: os.Stdout, - Level: hclog.LevelFromString(c.Proxy.Auth.LogLevel), + Level: hclog.LevelFromString(c.Auth.Local.LogLevel), Name: "plugin", JSONFormat: jsonFormat, TimeFormat: time.RFC3339, @@ -274,7 +290,7 @@ func NewPluginClient() *plugin.Client { HandshakeConfig: shared.Handshake, Plugins: shared.PluginMap, Logger: logger, - Cmd: exec.Command(c.Proxy.Auth.Command, c.Proxy.Auth.Parameters...), + Cmd: exec.Command(c.Auth.Local.Command, c.Auth.Local.Parameters...), AllowedProtocols: []plugin.Protocol{ plugin.ProtocolNetRPC, plugin.ProtocolGRPC}, }) diff --git a/cmd/plugin-auth-ldap/main.go b/cmd/plugin-auth-ldap/main.go index 52c460cf..5f4c27ff 100644 --- a/cmd/plugin-auth-ldap/main.go +++ b/cmd/plugin-auth-ldap/main.go @@ -4,7 +4,7 @@ import ( "crypto/tls" "flag" "fmt" - "github.com/grepplabs/kafka-proxy/plugin/auth/shared" + "github.com/grepplabs/kafka-proxy/plugin/local-auth/shared" "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-plugin" "github.com/sirupsen/logrus" diff --git a/cmd/plugin-auth-user/main.go b/cmd/plugin-auth-user/main.go index 158909d0..e5c06ccc 100644 --- a/cmd/plugin-auth-user/main.go +++ b/cmd/plugin-auth-user/main.go @@ -2,7 +2,7 @@ package main import ( "flag" - "github.com/grepplabs/kafka-proxy/plugin/auth/shared" + "github.com/grepplabs/kafka-proxy/plugin/local-auth/shared" "github.com/hashicorp/go-plugin" "github.com/sirupsen/logrus" "os" diff --git a/config/config.go b/config/config.go index 3dad6605..3cfdaf43 100644 --- a/config/config.go +++ b/config/config.go @@ -57,12 +57,34 @@ type Config struct { ListenerKeyPassword string CAChainCertFile string } - Auth struct { + } + Auth struct { + Local struct { Enable bool Command string Parameters []string LogLevel string } + Gateway struct { + Client struct { + Enable bool + Method string + Magic uint64 + Command string + Parameters []string + LogLevel string + Timeout time.Duration + } + Server struct { + Enable bool + Method string + Magic uint64 + Command string + Parameters []string + LogLevel string + Timeout time.Duration + } + } } Kafka struct { ClientID string @@ -216,8 +238,21 @@ func (c *Config) Validate() error { if c.Proxy.TLS.Enable && (c.Proxy.TLS.ListenerKeyFile == "" || c.Proxy.TLS.ListenerCertFile == "") { return errors.New("ListenerKeyFile and ListenerCertFile are required when Proxy TLS is enabled") } - if c.Proxy.Auth.Enable && c.Proxy.Auth.Command == "" { - return errors.New("Auth.Command is required when Proxy.Auth is enabled") + if c.Auth.Local.Enable && c.Auth.Local.Command == "" { + return errors.New("Command is required when Auth.Local.Enable is enabled") + } + if c.Auth.Gateway.Client.Enable && (c.Auth.Gateway.Client.Command == "" || c.Auth.Gateway.Client.Method == "" || c.Auth.Gateway.Client.Magic == 0) { + return errors.New("Command, Method and Magic are required when Auth.Gateway.Client.Enable is enabled") + } + if c.Auth.Gateway.Client.Enable && c.Auth.Gateway.Client.Timeout <= 0 { + return errors.New("Auth.Gateway.Client.Timeout must be greater than 0") + } + + if c.Auth.Gateway.Server.Enable && (c.Auth.Gateway.Server.Command == "" || c.Auth.Gateway.Server.Method == "" || c.Auth.Gateway.Server.Magic == 0) { + return errors.New("Command, Method and Magic are required when Auth.Gateway.Server.Enable is enabled") + } + if c.Auth.Gateway.Server.Enable && c.Auth.Gateway.Server.Timeout <= 0 { + return errors.New("Auth.Gateway.Server.Timeout must be greater than 0") } return nil } diff --git a/plugin/auth/proto/auth.pb.go b/plugin/local-auth/proto/auth.pb.go similarity index 100% rename from plugin/auth/proto/auth.pb.go rename to plugin/local-auth/proto/auth.pb.go diff --git a/plugin/auth/proto/auth.proto b/plugin/local-auth/proto/auth.proto similarity index 100% rename from plugin/auth/proto/auth.proto rename to plugin/local-auth/proto/auth.proto diff --git a/plugin/auth/shared/grpc.go b/plugin/local-auth/shared/grpc.go similarity index 94% rename from plugin/auth/shared/grpc.go rename to plugin/local-auth/shared/grpc.go index 384f793c..8c354d5d 100644 --- a/plugin/auth/shared/grpc.go +++ b/plugin/local-auth/shared/grpc.go @@ -1,7 +1,7 @@ package shared import ( - "github.com/grepplabs/kafka-proxy/plugin/auth/proto" + "github.com/grepplabs/kafka-proxy/plugin/local-auth/proto" "github.com/hashicorp/go-plugin" "golang.org/x/net/context" ) diff --git a/plugin/auth/shared/interface.go b/plugin/local-auth/shared/interface.go similarity index 95% rename from plugin/auth/shared/interface.go rename to plugin/local-auth/shared/interface.go index c5660e0c..2b9a6cab 100644 --- a/plugin/auth/shared/interface.go +++ b/plugin/local-auth/shared/interface.go @@ -5,7 +5,7 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" - "github.com/grepplabs/kafka-proxy/plugin/auth/proto" + "github.com/grepplabs/kafka-proxy/plugin/local-auth/proto" "github.com/hashicorp/go-plugin" "net/rpc" ) diff --git a/plugin/auth/shared/rpc.go b/plugin/local-auth/shared/rpc.go similarity index 100% rename from plugin/auth/shared/rpc.go rename to plugin/local-auth/shared/rpc.go diff --git a/proxy/auth.go b/proxy/auth.go new file mode 100644 index 00000000..4368a88f --- /dev/null +++ b/proxy/auth.go @@ -0,0 +1,105 @@ +package proxy + +import ( + "encoding/binary" + "fmt" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "io" + "net" + "strings" + "time" +) + +type AuthClient struct { + conn net.Conn + + magic uint64 + method string + timeout time.Duration +} + +//TODO: reset deadlines after method - ok +func (b *AuthClient) sendAndReceiveAuth() error { + //TODO: retrieve from plugin (with timeout) + data := "my-test-jwt-token" + + length := len(b.method) + 1 + len(data) + // 8 - bytes magic, 4 bytes length + buf := make([]byte, 12+length) + binary.BigEndian.PutUint64(buf[:8], b.magic) + binary.BigEndian.PutUint32(buf[8:], uint32(length)) + copy(buf[12:], []byte(b.method+"\x00"+data)) + + err := b.conn.SetDeadline(time.Now().Add(b.timeout)) + if err != nil { + return err + } + _, err = b.conn.Write(buf) + if err != nil { + return errors.Wrap(err, "Failed to write gateway handshake") + } + + header := make([]byte, 4) + _, err = io.ReadFull(b.conn, header) + // If the credentials are valid, we would get a 4 byte response filled with null characters. + // Otherwise, the broker closes the connection and we get an EOF + if err != nil { + if err == io.EOF { + return errors.New("Gateway auth failed") + } + return errors.Wrap(err, "Failed to read response while gateway authenticating") + } + return nil +} + +type AuthServer struct { + conn net.Conn + + magic uint64 + method string + timeout time.Duration +} + +//TODO: reset deadlines after method - ok +func (b *AuthServer) receiveAndSendAuth() error { + err := b.conn.SetDeadline(time.Now().Add(b.timeout)) + if err != nil { + return err + } + headerBuf := make([]byte, 12) // magic 8 + length 4 + _, err = io.ReadFull(b.conn, headerBuf) + if err != nil { + return errors.Wrap(err, "Failed to read gateway bytes magic") + } + + magic := binary.BigEndian.Uint64(headerBuf[:8]) + if magic != b.magic { + return errors.Wrap(err, "gateway handshake magic bytes mismatch") + } + + length := binary.BigEndian.Uint32(headerBuf[8:]) + payload := make([]byte, length-4) + _, err = io.ReadFull(b.conn, payload) + if err != nil { + return errors.Wrap(err, "Failed to read gateway handshake payload") + } + tokens := strings.Split(string(payload), "\x00") + if len(tokens) != 2 { + return fmt.Errorf("invalid gateway handshake: expected 2 tokens, got %d", len(tokens)) + } + if tokens[0] != b.method { + return errors.Wrap(err, fmt.Sprintf("gateway handshake method mismatch: expected %s , got %s", b.method, tokens[0])) + } + data := tokens[1] + // TODO: use data for authentication + _ = data + + logrus.Infof("gateway handshake payload: %s", data) + + header := make([]byte, 4) + if _, err := b.conn.Write(header); err != nil { + return err + } + return nil +} diff --git a/proxy/client.go b/proxy/client.go index 59424abd..4bc2d90c 100644 --- a/proxy/client.go +++ b/proxy/client.go @@ -3,7 +3,7 @@ package proxy import ( "crypto/tls" "github.com/grepplabs/kafka-proxy/config" - "github.com/grepplabs/kafka-proxy/plugin/auth/shared" + "github.com/grepplabs/kafka-proxy/plugin/local-auth/shared" "github.com/pkg/errors" "github.com/sirupsen/logrus" "net" @@ -62,8 +62,8 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne ResponseBufferSize: c.Proxy.ResponseBufferSize, ReadTimeout: c.Kafka.ReadTimeout, WriteTimeout: c.Kafka.WriteTimeout, - ListenerAuth: c.Proxy.Auth.Enable, - ListenerAuthenticator: passwordAuthenticator, + LocalAuth: c.Auth.Local.Enable, + LocalAuthenticator: passwordAuthenticator, ForbiddenApiKeys: forbiddenApiKeys, }}, nil } @@ -145,6 +145,21 @@ func (c *Client) dial(brokerAddress string, tlsConfig *tls.Config) (net.Conn, er } func (c *Client) auth(conn net.Conn) error { + if c.config.Auth.Gateway.Client.Enable { + authClient := AuthClient{ + conn: conn, + magic: c.config.Auth.Gateway.Client.Magic, + method: c.config.Auth.Gateway.Client.Method, + timeout: c.config.Auth.Gateway.Client.Timeout, + } + if err := authClient.sendAndReceiveAuth(); err != nil { + conn.Close() + return err + } + if err := conn.SetDeadline(time.Time{}); err != nil { + return err + } + } if c.config.Kafka.SASL.Enable { saslPlainAuth := SASLPlainAuth{ conn: conn, diff --git a/proxy/processor.go b/proxy/processor.go index 2d5fe9d2..95f15300 100644 --- a/proxy/processor.go +++ b/proxy/processor.go @@ -6,7 +6,7 @@ import ( "errors" "fmt" "github.com/grepplabs/kafka-proxy/config" - "github.com/grepplabs/kafka-proxy/plugin/auth/shared" + "github.com/grepplabs/kafka-proxy/plugin/local-auth/shared" "github.com/grepplabs/kafka-proxy/proxy/protocol" "io" "strconv" @@ -31,8 +31,8 @@ type ProcessorConfig struct { ResponseBufferSize int WriteTimeout time.Duration ReadTimeout time.Duration - ListenerAuth bool - ListenerAuthenticator shared.PasswordAuthenticator + LocalAuth bool + LocalAuthenticator shared.PasswordAuthenticator ForbiddenApiKeys map[int16]struct{} } @@ -44,8 +44,8 @@ type processor struct { writeTimeout time.Duration readTimeout time.Duration - listenerAuth bool - listenerAuthenticator shared.PasswordAuthenticator + localAuth bool + localAuthenticator shared.PasswordAuthenticator forbiddenApiKeys map[int16]struct{} // metrics @@ -81,12 +81,12 @@ func newProcessor(cfg ProcessorConfig, brokerAddress string) *processor { readTimeout: readTimeout, writeTimeout: writeTimeout, brokerAddress: brokerAddress, - listenerAuth: cfg.ListenerAuth, - listenerAuthenticator: cfg.ListenerAuthenticator, + localAuth: cfg.LocalAuth, + localAuthenticator: cfg.LocalAuthenticator, forbiddenApiKeys: cfg.ForbiddenApiKeys, } } -func (p *processor) localSasl(dst DeadlineWriter, src DeadlineReader) (err error) { +func (p *processor) doLocalSasl(dst DeadlineWriter, src DeadlineReader) (err error) { requestDeadline := time.Now().Add(p.readTimeout) err = dst.SetWriteDeadline(requestDeadline) if err != nil { @@ -150,7 +150,7 @@ func (p *processor) localSasl(dst DeadlineWriter, src DeadlineReader) (err error return saslResult } -func (p *processor) localAuth(dst DeadlineWriter, src DeadlineReader) (err error) { +func (p *processor) doLocalAuth(dst DeadlineWriter, src DeadlineReader) (err error) { requestDeadline := time.Now().Add(p.readTimeout) err = dst.SetWriteDeadline(requestDeadline) if err != nil { @@ -180,12 +180,12 @@ func (p *processor) localAuth(dst DeadlineWriter, src DeadlineReader) (err error if len(tokens) != 3 { return fmt.Errorf("invalid SASL/PLAIN request: expected 3 tokens, got %d", len(tokens)) } - if p.listenerAuthenticator == nil { + if p.localAuthenticator == nil { return protocol.PacketDecodingError{Info: "Listener authenticator is not set"} } // logrus.Infof("user: %s , password: %s", tokens[1], tokens[2]) - ok, status, err := p.listenerAuthenticator.Authenticate(tokens[1], tokens[2]) + ok, status, err := p.localAuthenticator.Authenticate(tokens[1], tokens[2]) if err != nil { proxyLocalAuthTotal.WithLabelValues("error", "1").Inc() return err @@ -206,11 +206,11 @@ func (p *processor) localAuth(dst DeadlineWriter, src DeadlineReader) (err error func (p *processor) RequestsLoop(dst DeadlineWriter, src DeadlineReaderWriter) (readErr bool, err error) { - if p.listenerAuth { - if err = p.localSasl(src, src); err != nil { + if p.localAuth { + if err = p.doLocalSasl(src, src); err != nil { return true, err } - if err = p.localAuth(src, src); err != nil { + if err = p.doLocalAuth(src, src); err != nil { return true, err } } diff --git a/proxy/protocol/responses_test.go b/proxy/protocol/responses_test.go index c2007188..91f8a77c 100644 --- a/proxy/protocol/responses_test.go +++ b/proxy/protocol/responses_test.go @@ -2,7 +2,7 @@ package protocol import ( "fmt" - "github.com/kataras/go-errors" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" "strings" "testing"