Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate RPC to ConnectRPC #703

Merged
merged 33 commits into from
Dec 15, 2023
Merged
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
0fb6b27
Change protobuf configuration
krapie Oct 31, 2023
ea11372
Chnage yorkie rpc server to connect rpc server
krapie Oct 31, 2023
84a33a6
Change interceptors and helper
krapie Oct 31, 2023
7b37ddc
Update protobuf related codes
krapie Nov 2, 2023
60590f3
Update client codes
krapie Nov 2, 2023
83cf1ea
Update server test
krapie Nov 4, 2023
e177697
Update admin server codes
krapie Nov 28, 2023
afd02b6
Update go.mod for upstream sync
krapie Nov 28, 2023
8f35b11
Update rpc ports to http ports
krapie Nov 29, 2023
e30cf95
Update client & admin codes for bench test
krapie Dec 2, 2023
1eee993
Update secondary helper and interceptor
krapie Dec 2, 2023
31775d6
Update watch stream client codes
krapie Dec 2, 2023
2833375
Update client test
krapie Dec 2, 2023
75d29cc
Lint codes
krapie Dec 2, 2023
3329871
Update tests
krapie Dec 4, 2023
b2a6ff8
Update rpc server related tests
krapie Dec 7, 2023
1f22ad8
Revise client interceptor for streaming
krapie Dec 7, 2023
464e53f
Revise client mock test
krapie Dec 7, 2023
af64113
Refactor server init code
krapie Dec 10, 2023
3b4e3eb
Remove grpc-prometheus metrics & cleanup rpc server codes
krapie Dec 10, 2023
0381041
Add rpc server handled counter metrics
krapie Dec 11, 2023
c97c1bf
Update error detail logic
krapie Dec 11, 2023
f0a8011
Update client & server configuration
krapie Dec 11, 2023
5e631a6
Lint codes
krapie Dec 11, 2023
4f23378
Update tools
krapie Dec 11, 2023
1a38601
Revise CORS
krapie Dec 11, 2023
56fb0eb
Update overall codes
krapie Dec 12, 2023
c93f27f
Clean up converter
hackerwins Dec 13, 2023
b3bec3d
Revise server code and remove gRPC-related flags
hackerwins Dec 14, 2023
2f30459
Simplify converter interface
hackerwins Dec 15, 2023
26f7370
Change Client.Watch to wait initialization response
hackerwins Dec 15, 2023
025b5db
Recover missing headers from watch stream
hackerwins Dec 15, 2023
34795af
Revise codes
hackerwins Dec 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Revise server code and remove gRPC-related flags
hackerwins committed Dec 14, 2023
commit b3bec3db83f4a3b0fa90c73fd2ca3e206ef1d1d7
18 changes: 0 additions & 18 deletions cmd/yorkie/server.go
Original file line number Diff line number Diff line change
@@ -181,24 +181,6 @@ func init() {
"",
"RPC key file's path",
)
cmd.Flags().Uint64Var(
&conf.RPC.MaxRequestBytes,
"rpc-max-requests-bytes",
server.DefaultRPCMaxRequestsBytes,
"Maximum client request size in bytes the server will accept.",
)
cmd.Flags().StringVar(
&conf.RPC.MaxConnectionAge,
"rpc-max-connection-age",
server.DefaultRPCMaxConnectionAge.String(),
"Maximum duration of connection may exist before it will be closed by sending a GoAway.",
)
cmd.Flags().StringVar(
&conf.RPC.MaxConnectionAgeGrace,
"rpc-max-connection-age-grace",
server.DefaultRPCMaxConnectionAgeGrace.String(),
"Additional grace period after MaxConnectionAge after which connections will be forcibly closed.",
)
cmd.Flags().IntVar(
&conf.Profiling.Port,
"profiling-port",
12 changes: 0 additions & 12 deletions server/config.go
Original file line number Diff line number Diff line change
@@ -140,18 +140,6 @@ func (c *Config) ensureDefaultValue() {
c.RPC.Port = DefaultRPCPort
}

if c.RPC.MaxRequestBytes == 0 {
c.RPC.MaxRequestBytes = DefaultRPCMaxRequestsBytes
}

if c.RPC.MaxConnectionAge == "" {
c.RPC.MaxConnectionAge = DefaultRPCMaxConnectionAge.String()
}

if c.RPC.MaxConnectionAgeGrace == "" {
c.RPC.MaxConnectionAgeGrace = DefaultRPCMaxConnectionAgeGrace.String()
}

if c.Profiling.Port == 0 {
c.Profiling.Port = DefaultProfilingPort
}
32 changes: 0 additions & 32 deletions server/rpc/config.go
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"os"
"time"
)

var (
@@ -30,10 +29,6 @@ var (
ErrInvalidCertFile = errors.New("invalid cert file for RPC server")
// ErrInvalidKeyFile occurs when the key file is invalid.
ErrInvalidKeyFile = errors.New("invalid key file for RPC server")
// ErrInvalidMaxConnectionAge occurs when the max connection age is invalid.
ErrInvalidMaxConnectionAge = errors.New("invalid max connection age for RPC server")
// ErrInvalidMaxConnectionAgeGrace occurs when the max connection age grace is invalid.
ErrInvalidMaxConnectionAgeGrace = errors.New("invalid max connection age grace for RPC server")
)

// Config is the configuration for creating a Server instance.
@@ -46,17 +41,6 @@ type Config struct {

// KeyFile is the path to the key file.
KeyFile string `yaml:"KeyFile"`

// MaxRequestBytes is the maximum client request size in bytes the server will accept.
MaxRequestBytes uint64 `yaml:"MaxRequestBytes"`

// MaxConnectionAge is a duration for the maximum amount of time a connection may exist
// before it will be closed by sending a GoAway.
MaxConnectionAge string `yaml:"MaxConnectionAge"`

// MaxConnectionAgeGrace is a duration for the amount of time after receiving a GoAway
// for pending RPCs to complete before forcibly closing connections.
MaxConnectionAgeGrace string `yaml:"MaxConnectionAgeGrace"`
}

// Validate validates the port number and the files for certification.
@@ -78,21 +62,5 @@ func (c *Config) Validate() error {
}
}

if _, err := time.ParseDuration(c.MaxConnectionAge); err != nil {
return fmt.Errorf(
"%s: %w",
c.MaxConnectionAge,
ErrInvalidMaxConnectionAge,
)
}

if _, err := time.ParseDuration(c.MaxConnectionAgeGrace); err != nil {
return fmt.Errorf(
"%s: %w",
c.MaxConnectionAgeGrace,
ErrInvalidMaxConnectionAgeGrace,
)
}

return nil
}
98 changes: 34 additions & 64 deletions server/rpc/server.go
Original file line number Diff line number Diff line change
@@ -20,7 +20,9 @@ package rpc

import (
"context"
"errors"
"fmt"
"math"
"net/http"
"time"

@@ -41,7 +43,6 @@ import (
// Server is a normal server that processes the logic requested by the client.
type Server struct {
conf *Config
serverMux *http.ServeMux
httpServer *http.Server
yorkieServiceCancel context.CancelFunc
tokenManager *auth.TokenManager
@@ -54,36 +55,36 @@ func NewServer(conf *Config, be *backend.Backend) (*Server, error) {
be.Config.ParseAdminTokenDuration(),
)

interceptor := connect.WithInterceptors(
connecthelper.NewLoggingInterceptor(),
interceptors.NewAdminAuthInterceptor(be, tokenManager),
interceptors.NewContextInterceptor(be),
interceptors.NewDefaultInterceptor(),
)

// TODO(krapie): find corresponding http/net server configurations that matches with gRPC server options
opts := []connect.HandlerOption{
connect.WithInterceptors(
connecthelper.NewLoggingInterceptor(),
interceptors.NewAdminAuthInterceptor(be, tokenManager),
interceptors.NewContextInterceptor(be),
interceptors.NewDefaultInterceptor(),
),
}

yorkieServiceCtx, yorkieServiceCancel := context.WithCancel(context.Background())

serverMux := http.NewServeMux()
serverMux.Handle(v1connect.NewYorkieServiceHandler(
newYorkieServer(yorkieServiceCtx, be),
interceptor,
))
serverMux.Handle(v1connect.NewAdminServiceHandler(
newAdminServer(be, tokenManager),
interceptor,
))
serverMux.Handle(grpchealth.NewHandler(grpchealth.NewStaticChecker(
mux := http.NewServeMux()
mux.Handle(v1connect.NewYorkieServiceHandler(newYorkieServer(yorkieServiceCtx, be), opts...))
mux.Handle(v1connect.NewAdminServiceHandler(newAdminServer(be, tokenManager), opts...))
mux.Handle(grpchealth.NewHandler(grpchealth.NewStaticChecker(
grpchealth.HealthV1ServiceName,
v1connect.YorkieServiceName,
v1connect.AdminServiceName,
)))

// TODO(hackerwins): We need to provide proper http server configuration.
return &Server{
conf: conf,
serverMux: serverMux,
httpServer: &http.Server{Addr: fmt.Sprintf(":%d", conf.Port)},
conf: conf,
httpServer: &http.Server{
Addr: fmt.Sprintf(":%d", conf.Port),
Handler: h2c.NewHandler(newCORS().Handler(mux),
&http2.Server{
MaxConcurrentStreams: math.MaxUint32,
},
),
},
yorkieServiceCancel: yorkieServiceCancel,
}, nil
}
@@ -112,17 +113,15 @@ func (s *Server) Shutdown(graceful bool) {
func (s *Server) listenAndServe() error {
go func() {
logging.DefaultLogger().Infof(fmt.Sprintf("serving RPC on %d", s.conf.Port))
s.httpServer.Handler = h2c.NewHandler(
newCORS().Handler(s.serverMux),
&http2.Server{},
)

if s.conf.CertFile != "" && s.conf.KeyFile != "" {
if err := s.httpServer.ListenAndServeTLS(s.conf.CertFile, s.conf.KeyFile); err != http.ErrServerClosed {
if err := s.httpServer.ListenAndServeTLS(s.conf.CertFile, s.conf.KeyFile); !errors.Is(err, http.ErrServerClosed) {
logging.DefaultLogger().Errorf("HTTP server ListenAndServeTLS: %v", err)
}
return
}
if err := s.httpServer.ListenAndServe(); err != http.ErrServerClosed {

if err := s.httpServer.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
logging.DefaultLogger().Errorf("HTTP server ListenAndServe: %v", err)
}
return
@@ -140,43 +139,14 @@ func newCORS() *cors.Cors {
http.MethodDelete,
},
AllowOriginFunc: func(origin string) bool {
// TODO(hackerwins): We need to provide a way to configure allow origins in the dashboard.
return true
},
AllowedHeaders: []string{
"Grpc-Timeout",
"Content-Type",
"Keep-Alive",
"User-Agent",
"Cache-Control",
"Content-Type",
"Content-Transfer-Encoding",
"Custom-Header-1",
"Connect-Protocol-Version",
"X-Accept-Content-Transfer-Encoding",
"X-Accept-Response-Streaming",
"X-User-Agent",
"X-Yorkie-User-Agent",
"X-Grpc-Web",
"Authorization",
"X-API-Key",
"X-Shard-Key",
},
MaxAge: int(1728 * time.Second),
ExposedHeaders: []string{
"Accept",
"Accept-Encoding",
"Accept-Post",
"Connect-Accept-Encoding",
"Connect-Content-Encoding",
"Content-Encoding",
"Grpc-Accept-Encoding",
"Grpc-Encoding",
"Grpc-Message",
"Grpc-Status",
"Grpc-Status-Details-Bin",
"X-Custom-Header",
"Custom-Header-1",
},
AllowedHeaders: []string{"*"},
ExposedHeaders: []string{"*"},
// MaxAge indicates how long (in seconds) the results of a preflight request
// can be cached. FF caps this value at 24h, and modern Chrome caps it at 2h.
MaxAge: int(2 * time.Hour / time.Second),
AllowCredentials: true,
})
}
21 changes: 7 additions & 14 deletions server/rpc/server_test.go
Original file line number Diff line number Diff line change
@@ -100,10 +100,7 @@ func TestMain(m *testing.M) {
}

testRPCServer, err = rpc.NewServer(&rpc.Config{
Port: helper.RPCPort,
MaxRequestBytes: helper.RPCMaxRequestBytes,
MaxConnectionAge: helper.RPCMaxConnectionAge.String(),
MaxConnectionAgeGrace: helper.RPCMaxConnectionAgeGrace.String(),
Port: helper.RPCPort,
}, be)
if err != nil {
log.Fatal(err)
@@ -1013,20 +1010,16 @@ func TestConfig_Validate(t *testing.T) {
{config: &rpc.Config{Port: 8080, KeyFile: "noSuchKeyFile"}, expected: rpc.ErrInvalidKeyFile},
// not to use tls
{config: &rpc.Config{
Port: 8080,
CertFile: "",
KeyFile: "",
MaxConnectionAge: "50s",
MaxConnectionAgeGrace: "10s",
Port: 8080,
CertFile: "",
KeyFile: "",
},
expected: nil},
// pass any file existing
{config: &rpc.Config{
Port: 8080,
CertFile: "server_test.go",
KeyFile: "server_test.go",
MaxConnectionAge: "50s",
MaxConnectionAgeGrace: "10s",
Port: 8080,
CertFile: "server_test.go",
KeyFile: "server_test.go",
},
expected: nil},
}
1 change: 0 additions & 1 deletion test/bench/grpc_bench_test.go
Original file line number Diff line number Diff line change
@@ -44,7 +44,6 @@ var defaultServer *server.Yorkie

func startDefaultServer() {
config := helper.TestConfig()
config.RPC.MaxRequestBytes = uint64(10 * 1024 * 1024)
svr, err := server.New(config)
if err != nil {
logging.DefaultLogger().Fatal(err)
10 changes: 2 additions & 8 deletions test/helper/helper.go
Original file line number Diff line number Diff line change
@@ -49,10 +49,7 @@ var testStartedAt int64

// Below are the values of the Yorkie config used in the test.
var (
RPCPort = 21101
RPCMaxRequestBytes = uint64(4 * 1024 * 1024)
RPCMaxConnectionAge = 8 * gotime.Second
RPCMaxConnectionAgeGrace = 2 * gotime.Second
RPCPort = 21101

ProfilingPort = 21102

@@ -200,10 +197,7 @@ func TestConfig() *server.Config {
portOffset += 100
return &server.Config{
RPC: &rpc.Config{
Port: RPCPort + portOffset,
MaxRequestBytes: RPCMaxRequestBytes,
MaxConnectionAge: RPCMaxConnectionAge.String(),
MaxConnectionAgeGrace: RPCMaxConnectionAgeGrace.String(),
Port: RPCPort + portOffset,
},
Profiling: &profiling.Config{
Port: ProfilingPort + portOffset,