Skip to content

Commit

Permalink
Add gateway auth
Browse files Browse the repository at this point in the history
  • Loading branch information
everesio committed Mar 13, 2018
1 parent 7ad8fa7 commit 07077c4
Show file tree
Hide file tree
Showing 15 changed files with 218 additions and 47 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ release: clean build.linux build/osx/$(BINARY)
github-release upload -u grepplabs -r $(BINARY) -t $(TAG) -f build/osx/$(BINARY) -n darwin/amd64/$(BINARY)
github-release info -u grepplabs -r $(BINARY)

protoc.auth:
protoc -I plugin/auth/proto/ plugin/auth/proto/auth.proto --go_out=plugins=grpc:plugin/auth/proto/
protoc.local-auth:
protoc -I plugin/local-auth/proto/ plugin/local-auth/proto/auth.proto --go_out=plugins=grpc:plugin/local-auth/proto/

plugin.auth-user:
CGO_ENABLED=0 go build -o build/auth-user $(BUILD_FLAGS) -ldflags "$(LDFLAGS)" cmd/plugin-auth-user/main.go
Expand Down
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,17 @@ See:
--proxy-listener-cert-file "server-cert.pem" \
--proxy-listener-ca-chain-cert-file "ca.pem" \
--proxy-listener-tls-enable \
--proxy-listener-auth-enable \
--proxy-listener-auth-command build/auth-user \
--proxy-listener-auth-param "--username=my-test-user" \
--proxy-listener-auth-param "--password=my-test-password"
--auth-local-enable \
--auth-local-command build/auth-user \
--auth-local-param "--username=my-test-user" \
--auth-local-param "--password=my-test-password"
make clean build plugin.auth-ldap && build/kafka-proxy server \
--proxy-listener-auth-enable \
--proxy-listener-auth-command build/auth-ldap \
--proxy-listener-auth-param "--url=ldaps://ldap.example.com:636" \
--proxy-listener-auth-param "--user-dn=cn=users,dc=exemple,dc=com" \
--proxy-listener-auth-param "--user-attr=uid" \
--auth-local-enable \
--auth-local-command build/auth-ldap \
--auth-local-param "--url=ldaps://ldap.example.com:636" \
--auth-local-param "--user-dn=cn=users,dc=exemple,dc=com" \
--auth-local-param "--user-attr=uid" \
--bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400"
Expand Down
38 changes: 27 additions & 11 deletions cmd/kafka-proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"time"

"errors"
"github.com/grepplabs/kafka-proxy/plugin/auth/shared"
"github.com/grepplabs/kafka-proxy/plugin/local-auth/shared"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
)
Expand Down Expand Up @@ -75,11 +75,27 @@ func init() {
Server.Flags().StringVar(&c.Proxy.TLS.ListenerKeyPassword, "proxy-listener-key-password", "", "Password to decrypt rsa private key")
Server.Flags().StringVar(&c.Proxy.TLS.CAChainCertFile, "proxy-listener-ca-chain-cert-file", "", "PEM encoded CA's certificate file")

// authentication plugin
Server.Flags().BoolVar(&c.Proxy.Auth.Enable, "proxy-listener-auth-enable", false, "Enable SASL/PLAIN listener authentication")
Server.Flags().StringVar(&c.Proxy.Auth.Command, "proxy-listener-auth-command", "", "Path to authentication plugin binary")
Server.Flags().StringArrayVar(&c.Proxy.Auth.Parameters, "proxy-listener-auth-param", []string{}, "Authentication plugin parameter")
Server.Flags().StringVar(&c.Proxy.Auth.LogLevel, "proxy-listener-auth-log-level", "trace", "Log level of the auth plugin")
// local authentication plugin
Server.Flags().BoolVar(&c.Auth.Local.Enable, "auth-local-enable", false, "Enable local SASL/PLAIN authentication performed by listener - SASL handshake will not be passed to kafka brokers")
Server.Flags().StringVar(&c.Auth.Local.Command, "auth-local-command", "", "Path to authentication plugin binary")
Server.Flags().StringArrayVar(&c.Auth.Local.Parameters, "auth-local-param", []string{}, "Authentication plugin parameter")
Server.Flags().StringVar(&c.Auth.Local.LogLevel, "auth-local-log-level", "trace", "Log level of the auth plugin")

Server.Flags().BoolVar(&c.Auth.Gateway.Client.Enable, "auth-gateway-client-enable", false, "Enable gateway client authentication")
Server.Flags().StringVar(&c.Auth.Gateway.Client.Command, "auth-gateway-client-command", "", "Path to authentication plugin binary")
Server.Flags().StringArrayVar(&c.Auth.Gateway.Client.Parameters, "auth-gateway-client-param", []string{}, "Authentication plugin parameter")
Server.Flags().StringVar(&c.Auth.Gateway.Client.LogLevel, "auth-gateway-client-log-level", "trace", "Log level of the auth plugin")
Server.Flags().StringVar(&c.Auth.Gateway.Client.Method, "auth-gateway-client-method", "", "Authentication method")
Server.Flags().Uint64Var(&c.Auth.Gateway.Client.Magic, "auth-gateway-client-magic", 0, "Magic bytes sent in the handshake")
Server.Flags().DurationVar(&c.Auth.Gateway.Client.Timeout, "auth-gateway-client-timeout", 10*time.Second, "Authentication timeout")

Server.Flags().BoolVar(&c.Auth.Gateway.Server.Enable, "auth-gateway-server-enable", false, "Enable proxy server authentication")
Server.Flags().StringVar(&c.Auth.Gateway.Server.Command, "auth-gateway-server-command", "", "Path to authentication plugin binary")
Server.Flags().StringArrayVar(&c.Auth.Gateway.Server.Parameters, "auth-gateway-server-param", []string{}, "Authentication plugin parameter")
Server.Flags().StringVar(&c.Auth.Gateway.Server.LogLevel, "auth-gateway-server-log-level", "trace", "Log level of the auth plugin")
Server.Flags().StringVar(&c.Auth.Gateway.Server.Method, "auth-gateway-server-method", "", "Authentication method")
Server.Flags().Uint64Var(&c.Auth.Gateway.Server.Magic, "auth-gateway-server-magic", 0, "Magic bytes sent in the handshake")
Server.Flags().DurationVar(&c.Auth.Gateway.Server.Timeout, "auth-gateway-server-timeout", 10*time.Second, "Authentication timeout")

// kafka
Server.Flags().StringVar(&c.Kafka.ClientID, "kafka-client-id", "kafka-proxy", "An optional identifier to track the source of requests")
Expand Down Expand Up @@ -127,8 +143,8 @@ func Run(_ *cobra.Command, _ []string) {
logrus.Infof("Starting kafka-proxy version %s", config.Version)

var passwordAuthenticator shared.PasswordAuthenticator
if c.Proxy.Auth.Enable {
client := NewPluginClient()
if c.Auth.Local.Enable {
client := NewLocalAuthPluginClient()
defer client.Kill()

rpcClient, err := client.Client()
Expand Down Expand Up @@ -257,14 +273,14 @@ func SetLogger() {
logrus.SetLevel(level)
}

func NewPluginClient() *plugin.Client {
func NewLocalAuthPluginClient() *plugin.Client {
jsonFormat := false
if c.Log.Format == "json" {
jsonFormat = true
}
logger := hclog.New(&hclog.LoggerOptions{
Output: os.Stdout,
Level: hclog.LevelFromString(c.Proxy.Auth.LogLevel),
Level: hclog.LevelFromString(c.Auth.Local.LogLevel),
Name: "plugin",
JSONFormat: jsonFormat,
TimeFormat: time.RFC3339,
Expand All @@ -274,7 +290,7 @@ func NewPluginClient() *plugin.Client {
HandshakeConfig: shared.Handshake,
Plugins: shared.PluginMap,
Logger: logger,
Cmd: exec.Command(c.Proxy.Auth.Command, c.Proxy.Auth.Parameters...),
Cmd: exec.Command(c.Auth.Local.Command, c.Auth.Local.Parameters...),
AllowedProtocols: []plugin.Protocol{
plugin.ProtocolNetRPC, plugin.ProtocolGRPC},
})
Expand Down
2 changes: 1 addition & 1 deletion cmd/plugin-auth-ldap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"crypto/tls"
"flag"
"fmt"
"github.com/grepplabs/kafka-proxy/plugin/auth/shared"
"github.com/grepplabs/kafka-proxy/plugin/local-auth/shared"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-plugin"
"github.com/sirupsen/logrus"
Expand Down
2 changes: 1 addition & 1 deletion cmd/plugin-auth-user/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

import (
"flag"
"github.com/grepplabs/kafka-proxy/plugin/auth/shared"
"github.com/grepplabs/kafka-proxy/plugin/local-auth/shared"
"github.com/hashicorp/go-plugin"
"github.com/sirupsen/logrus"
"os"
Expand Down
41 changes: 38 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,34 @@ type Config struct {
ListenerKeyPassword string
CAChainCertFile string
}
Auth struct {
}
Auth struct {
Local struct {
Enable bool
Command string
Parameters []string
LogLevel string
}
Gateway struct {
Client struct {
Enable bool
Method string
Magic uint64
Command string
Parameters []string
LogLevel string
Timeout time.Duration
}
Server struct {
Enable bool
Method string
Magic uint64
Command string
Parameters []string
LogLevel string
Timeout time.Duration
}
}
}
Kafka struct {
ClientID string
Expand Down Expand Up @@ -216,8 +238,21 @@ func (c *Config) Validate() error {
if c.Proxy.TLS.Enable && (c.Proxy.TLS.ListenerKeyFile == "" || c.Proxy.TLS.ListenerCertFile == "") {
return errors.New("ListenerKeyFile and ListenerCertFile are required when Proxy TLS is enabled")
}
if c.Proxy.Auth.Enable && c.Proxy.Auth.Command == "" {
return errors.New("Auth.Command is required when Proxy.Auth is enabled")
if c.Auth.Local.Enable && c.Auth.Local.Command == "" {
return errors.New("Command is required when Auth.Local.Enable is enabled")
}
if c.Auth.Gateway.Client.Enable && (c.Auth.Gateway.Client.Command == "" || c.Auth.Gateway.Client.Method == "" || c.Auth.Gateway.Client.Magic == 0) {
return errors.New("Command, Method and Magic are required when Auth.Gateway.Client.Enable is enabled")
}
if c.Auth.Gateway.Client.Enable && c.Auth.Gateway.Client.Timeout <= 0 {
return errors.New("Auth.Gateway.Client.Timeout must be greater than 0")
}

if c.Auth.Gateway.Server.Enable && (c.Auth.Gateway.Server.Command == "" || c.Auth.Gateway.Server.Method == "" || c.Auth.Gateway.Server.Magic == 0) {
return errors.New("Command, Method and Magic are required when Auth.Gateway.Server.Enable is enabled")
}
if c.Auth.Gateway.Server.Enable && c.Auth.Gateway.Server.Timeout <= 0 {
return errors.New("Auth.Gateway.Server.Timeout must be greater than 0")
}
return nil
}
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package shared

import (
"github.com/grepplabs/kafka-proxy/plugin/auth/proto"
"github.com/grepplabs/kafka-proxy/plugin/local-auth/proto"
"github.com/hashicorp/go-plugin"
"golang.org/x/net/context"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"

"github.com/grepplabs/kafka-proxy/plugin/auth/proto"
"github.com/grepplabs/kafka-proxy/plugin/local-auth/proto"
"github.com/hashicorp/go-plugin"
"net/rpc"
)
Expand Down
File renamed without changes.
105 changes: 105 additions & 0 deletions proxy/auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package proxy

import (
"encoding/binary"
"fmt"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"io"
"net"
"strings"
"time"
)

type AuthClient struct {
conn net.Conn

magic uint64
method string
timeout time.Duration
}

//TODO: reset deadlines after method - ok
func (b *AuthClient) sendAndReceiveAuth() error {
//TODO: retrieve from plugin (with timeout)
data := "my-test-jwt-token"

length := len(b.method) + 1 + len(data)
// 8 - bytes magic, 4 bytes length
buf := make([]byte, 12+length)
binary.BigEndian.PutUint64(buf[:8], b.magic)
binary.BigEndian.PutUint32(buf[8:], uint32(length))
copy(buf[12:], []byte(b.method+"\x00"+data))

err := b.conn.SetDeadline(time.Now().Add(b.timeout))
if err != nil {
return err
}
_, err = b.conn.Write(buf)
if err != nil {
return errors.Wrap(err, "Failed to write gateway handshake")
}

header := make([]byte, 4)
_, err = io.ReadFull(b.conn, header)
// If the credentials are valid, we would get a 4 byte response filled with null characters.
// Otherwise, the broker closes the connection and we get an EOF
if err != nil {
if err == io.EOF {
return errors.New("Gateway auth failed")
}
return errors.Wrap(err, "Failed to read response while gateway authenticating")
}
return nil
}

type AuthServer struct {
conn net.Conn

magic uint64
method string
timeout time.Duration
}

//TODO: reset deadlines after method - ok
func (b *AuthServer) receiveAndSendAuth() error {
err := b.conn.SetDeadline(time.Now().Add(b.timeout))
if err != nil {
return err
}
headerBuf := make([]byte, 12) // magic 8 + length 4
_, err = io.ReadFull(b.conn, headerBuf)
if err != nil {
return errors.Wrap(err, "Failed to read gateway bytes magic")
}

magic := binary.BigEndian.Uint64(headerBuf[:8])
if magic != b.magic {
return errors.Wrap(err, "gateway handshake magic bytes mismatch")
}

length := binary.BigEndian.Uint32(headerBuf[8:])
payload := make([]byte, length-4)
_, err = io.ReadFull(b.conn, payload)
if err != nil {
return errors.Wrap(err, "Failed to read gateway handshake payload")
}
tokens := strings.Split(string(payload), "\x00")
if len(tokens) != 2 {
return fmt.Errorf("invalid gateway handshake: expected 2 tokens, got %d", len(tokens))
}
if tokens[0] != b.method {
return errors.Wrap(err, fmt.Sprintf("gateway handshake method mismatch: expected %s , got %s", b.method, tokens[0]))
}
data := tokens[1]
// TODO: use data for authentication
_ = data

logrus.Infof("gateway handshake payload: %s", data)

header := make([]byte, 4)
if _, err := b.conn.Write(header); err != nil {
return err
}
return nil
}
21 changes: 18 additions & 3 deletions proxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package proxy
import (
"crypto/tls"
"github.com/grepplabs/kafka-proxy/config"
"github.com/grepplabs/kafka-proxy/plugin/auth/shared"
"github.com/grepplabs/kafka-proxy/plugin/local-auth/shared"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"net"
Expand Down Expand Up @@ -62,8 +62,8 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne
ResponseBufferSize: c.Proxy.ResponseBufferSize,
ReadTimeout: c.Kafka.ReadTimeout,
WriteTimeout: c.Kafka.WriteTimeout,
ListenerAuth: c.Proxy.Auth.Enable,
ListenerAuthenticator: passwordAuthenticator,
LocalAuth: c.Auth.Local.Enable,
LocalAuthenticator: passwordAuthenticator,
ForbiddenApiKeys: forbiddenApiKeys,
}}, nil
}
Expand Down Expand Up @@ -145,6 +145,21 @@ func (c *Client) dial(brokerAddress string, tlsConfig *tls.Config) (net.Conn, er
}

func (c *Client) auth(conn net.Conn) error {
if c.config.Auth.Gateway.Client.Enable {
authClient := AuthClient{
conn: conn,
magic: c.config.Auth.Gateway.Client.Magic,
method: c.config.Auth.Gateway.Client.Method,
timeout: c.config.Auth.Gateway.Client.Timeout,
}
if err := authClient.sendAndReceiveAuth(); err != nil {
conn.Close()
return err
}
if err := conn.SetDeadline(time.Time{}); err != nil {
return err
}
}
if c.config.Kafka.SASL.Enable {
saslPlainAuth := SASLPlainAuth{
conn: conn,
Expand Down
Loading

0 comments on commit 07077c4

Please sign in to comment.