diff --git a/README.md b/README.md index e9971b3a..ceda0319 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,7 @@ See: --dynamic-listeners-disable Disable dynamic listeners. --external-server-mapping stringArray Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started --forbidden-api-keys intSlice Forbidden Kafka request types. The restriction should prevent some Kafka operations e.g. 20 - DeleteTopics + --forward-proxy string URL of the forward proxy. Supported schemas are http and socks5 -h, --help help for server --http-disable Disable HTTP endpoints --http-health-path string Path on which to health endpoint (default "/health") @@ -121,9 +122,6 @@ See: --sasl-jaas-config-file string Location of JAAS config file with SASL username and password --sasl-password string SASL user password --sasl-username string SASL user name - --socks5-address string Address of SOCKS5 proxy to connect through when connecting to kafka brokers - --socks5-password string Password for SOCKS5 proxy Username/Password Authentication - --socks5-username string Username for SOCKS5 proxy Username/Password Authentication --tls-ca-chain-cert-file string PEM encoded CA's certificate file --tls-client-cert-file string PEM encoded file with client certificate --tls-client-key-file string PEM encoded file with private key for the client certificate @@ -212,17 +210,51 @@ Authentication between Kafka Proxy Client and Kafka Proxy Server with Google-ID ### Connect to Kafka through SOCKS5 Proxy example +Connect through test SOCKS5 Proxy server + +``` + kafka-proxy tools socks5-proxy --addr localhost:1080 + kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9092,127.0.0.1:32500" \ --bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \ --bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502" - --socks5-address localhost:1080 + --forward-proxy socks5://localhost:1080 + +``` + +``` + kafka-proxy tools socks5-proxy --addr localhost:1080 --username my-proxy-user --password my-proxy-password kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9092,127.0.0.1:32500" \ --bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \ --bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502" \ - --socks5-address localhost:1080 \ - --socks5-username my-proxy-user \ - --socks5-password my-proxy-password + --forward-proxy socks5://my-proxy-user:my-proxy-password@localhost:1080 + +``` + +### Connect to Kafka through HTTP Proxy example + +Connect through test HTTP Proxy server using CONNECT method + +``` + kafka-proxy tools http-proxy --addr localhost:3128 + + kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9092,127.0.0.1:32500" \ + --bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \ + --bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502" + --forward-proxy http://localhost:3128 + +``` + +``` + kafka-proxy tools http-proxy --addr localhost:3128 --username my-proxy-user --password my-proxy-password + + kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9092,127.0.0.1:32500" \ + --bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \ + --bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502" \ + --forward-proxy http://my-proxy-user:my-proxy-password@localhost:3128 + +``` ### Kubernetes sidecar container example diff --git a/cmd/kafka-proxy/server.go b/cmd/kafka-proxy/server.go index 790ba964..508ded63 100644 --- a/cmd/kafka-proxy/server.go +++ b/cmd/kafka-proxy/server.go @@ -149,10 +149,8 @@ func init() { Server.Flags().StringVar(&c.Log.Format, "log-format", "text", "Log format text or json") Server.Flags().StringVar(&c.Log.Level, "log-level", "info", "Log level debug, info, warning, error, fatal or panic") - // Socks5 to Kafka - Server.Flags().StringVar(&c.Socks5.ProxyAddress, "socks5-address", "", "Address of SOCKS5 proxy to connect through when connecting to kafka brokers") - Server.Flags().StringVar(&c.Socks5.Username, "socks5-username", "", "Username for SOCKS5 proxy Username/Password Authentication") - Server.Flags().StringVar(&c.Socks5.Password, "socks5-password", "", "Password for SOCKS5 proxy Username/Password Authentication") + // Connect through Socks5 or HTTP CONNECT to Kafka + Server.Flags().StringVar(&c.ForwardProxy.Url, "forward-proxy", "", "URL of the forward proxy. Supported schemas are socks5 and http") } func Run(_ *cobra.Command, _ []string) { diff --git a/cmd/tools/tools.go b/cmd/tools/tools.go index 05c9fa94..6053e007 100644 --- a/cmd/tools/tools.go +++ b/cmd/tools/tools.go @@ -48,7 +48,7 @@ func httpProxyServer(cmd *cobra.Command, _ []string) error { proxy := goproxy.NewProxyHttpServer() proxy.Verbose = verbose if username != "" && password != "" { - logrus.Info("HTTP proxy will require basic authentication") + logrus.Info("HTTP proxy will require Basic Proxy-Authorization for CONNECT") proxy.OnRequest().HandleConnect(auth.BasicConnect("", func(user, passwd string) bool { return user == username && passwd == password @@ -65,12 +65,8 @@ func socks5ProxyServer(cmd *cobra.Command, _ []string) error { addr, _ := cmd.Flags().GetString("addr") conf := &socks5.Config{} - server, err := socks5.New(conf) - if err != nil { - return err - } if username != "" && password != "" { - logrus.Info("SOCKS5 proxy will require basic authentication", addr) + logrus.Info("SOCKS5 proxy will require Username/Password Authentication") authenticator := &socks5.UserPassAuthenticator{ Credentials: socks5ProxyCredentials{ @@ -80,7 +76,10 @@ func socks5ProxyServer(cmd *cobra.Command, _ []string) error { } conf.AuthMethods = []socks5.Authenticator{authenticator} } - + server, err := socks5.New(conf) + if err != nil { + return err + } logrus.Infof("Starting SOCKS5 proxy server on %s", addr) return server.ListenAndServe("tcp", addr) } diff --git a/config/config.go b/config/config.go index 28148569..51cb1b47 100644 --- a/config/config.go +++ b/config/config.go @@ -5,6 +5,7 @@ import ( "github.com/grepplabs/kafka-proxy/pkg/libs/util" "github.com/pkg/errors" "net" + "net/url" "strings" "time" ) @@ -120,10 +121,13 @@ type Config struct { JaasConfigFile string } } - Socks5 struct { - ProxyAddress string - Username string - Password string + ForwardProxy struct { + Url string + + Scheme string + Address string + Username string + Password string } } @@ -266,21 +270,32 @@ func (c *Config) Validate() error { if c.Auth.Gateway.Server.Enable && c.Auth.Gateway.Server.Timeout <= 0 { return errors.New("Auth.Gateway.Server.Timeout must be greater than 0") } - if c.Socks5.ProxyAddress == "" && (c.Socks5.Username != "" || c.Socks5.Password != "") { - return errors.New("Socks5.ProxyAddress must not be empty when Socks5 Username/Password is provided") - } - if (c.Socks5.Username != "" && c.Socks5.Password == "") || (c.Socks5.Username == "" && c.Socks5.Password != "") { - return errors.New("Both Socks5 Username and Password must be provided provided") - } - if len(c.Socks5.Username) > 255 || len(c.Socks5.Password) > 255 { - // RFC1929 - return errors.New("Max length of Socks5 Username/Password is 255 chars") - } - if c.Socks5.ProxyAddress != "" { - if _, _, err := util.SplitHostPort(c.Socks5.ProxyAddress); err != nil { + // http://username:password@hostname:port or socks5://username:password@hostname:port + if c.ForwardProxy.Url != "" { + var proxyUrl *url.URL + var err error + if proxyUrl, err = url.Parse(c.ForwardProxy.Url); err != nil { return err } - } + if proxyUrl.Port() == "" { + return errors.New("Port part of ForwardProxy.Url must not be empty") + } + c.ForwardProxy.Address = proxyUrl.Host + if proxyUrl.Scheme != "http" && proxyUrl.Scheme != "socks5" { + return errors.New("ForwardProxy.Url Scheme must be http or socks5") + } + c.ForwardProxy.Scheme = proxyUrl.Scheme + + if proxyUrl.User != nil { + password, _ := proxyUrl.User.Password() + if proxyUrl.User.Username() == "" || password == "" { + return errors.New("Both ForwardProxy Url Username and Password must be provided") + } + c.ForwardProxy.Username = proxyUrl.User.Username() + c.ForwardProxy.Password = password + } + + } return nil } diff --git a/proxy/client.go b/proxy/client.go index 4116374b..cc676ec1 100644 --- a/proxy/client.go +++ b/proxy/client.go @@ -109,24 +109,38 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne } func newDialer(c *config.Config, tlsConfig *tls.Config) (Dialer, error) { + directDialer := directDialer{ + dialTimeout: c.Kafka.DialTimeout, + keepAlive: c.Kafka.KeepAlive, + } + var rawDialer Dialer - if c.Socks5.ProxyAddress != "" { - logrus.Infof("Kafka clients will connect through the SOCKS5 proxy %s", c.Socks5.ProxyAddress) - rawDialer = &socks5Dialer{ - directDialer: directDialer{ - dialTimeout: c.Kafka.DialTimeout, - keepAlive: c.Kafka.KeepAlive, - }, - proxyNetwork: "tcp", - proxyAddr: c.Socks5.ProxyAddress, - username: c.Socks5.Username, - password: c.Socks5.Password, + if c.ForwardProxy.Url != "" { + switch c.ForwardProxy.Scheme { + case "socks5": + logrus.Infof("Kafka clients will connect through the SOCKS5 proxy %s", c.ForwardProxy.Address) + rawDialer = &socks5Dialer{ + directDialer: directDialer, + proxyNetwork: "tcp", + proxyAddr: c.ForwardProxy.Address, + username: c.ForwardProxy.Username, + password: c.ForwardProxy.Password, + } + case "http": + logrus.Infof("Kafka clients will connect through the HTTP proxy %s using CONNECT", c.ForwardProxy.Address) + + rawDialer = &httpProxy{ + forwardDialer: directDialer, + network: "tcp", + hostPort: c.ForwardProxy.Address, + username: c.ForwardProxy.Username, + password: c.ForwardProxy.Password, + } + default: + return nil, errors.New("Only http or socks5 proxy is supported") } } else { - rawDialer = directDialer{ - dialTimeout: c.Kafka.DialTimeout, - keepAlive: c.Kafka.KeepAlive, - } + rawDialer = directDialer } if c.Kafka.TLS.Enable { if tlsConfig == nil { diff --git a/proxy/dial.go b/proxy/dial.go index f8d48eda..863d27d8 100644 --- a/proxy/dial.go +++ b/proxy/dial.go @@ -139,6 +139,7 @@ func (d tlsDialer) Dial(network, addr string) (net.Conn, error) { type httpProxy struct { forwardDialer Dialer + network string hostPort string username, password string } @@ -154,10 +155,11 @@ func (s *httpProxy) Dial(network, addr string) (net.Conn, error) { } req.Close = false if s.username != "" && s.password != "" { - req.Header.Set("Proxy-Authorization", base64.StdEncoding.EncodeToString([]byte(s.username+":"+s.password))) + basic := "Basic " + base64.StdEncoding.EncodeToString([]byte(s.username+":"+s.password)) + req.Header.Set("Proxy-Authorization", basic) } - c, err := s.forwardDialer.Dial("tcp", s.hostPort) + c, err := s.forwardDialer.Dial(s.network, s.hostPort) if err != nil { return nil, err } @@ -180,17 +182,3 @@ func (s *httpProxy) Dial(network, addr string) (net.Conn, error) { return c, nil } - -func newHTTPProxy(uri *url.URL, forward Dialer) (Dialer, error) { - s := new(httpProxy) - s.hostPort = uri.Host - if uri.Port() == "" { - return nil, fmt.Errorf("http proxy url doesn't contain a port [%v]", uri) - } - s.forwardDialer = forward - if uri.User != nil { - s.username = uri.User.Username() - s.password, _ = uri.User.Password() - } - return s, nil -}