From c416d95bc8360c58460abc5b12bee200ef1f54bf Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 13 Jan 2025 14:19:55 +0800 Subject: [PATCH] refactor Signed-off-by: Ryan Leung --- cmd/pd-server/main.go | 13 +++-- server/apiv2/handlers/micro_service.go | 8 +-- server/cluster/cluster.go | 33 ++++++------- server/server.go | 49 ++++++++++++------- server/server_test.go | 2 +- tests/cluster.go | 44 +++-------------- tests/integrations/client/client_test.go | 2 +- .../mcs/keyspace/tso_keyspace_group_test.go | 2 +- tests/integrations/mcs/members/member_test.go | 2 +- tests/integrations/mcs/tso/api_test.go | 6 +-- .../mcs/tso/keyspace_group_manager_test.go | 6 +-- tests/integrations/mcs/tso/proxy_test.go | 2 +- tests/integrations/mcs/tso/server_test.go | 46 ++++++++--------- tests/integrations/tso/client_test.go | 4 +- .../apiv2/handlers/tso_keyspace_group_test.go | 2 +- tests/testutil.go | 2 +- .../tests/keyspace/keyspace_group_test.go | 14 +++--- tools/pd-ctl/tests/keyspace/keyspace_test.go | 4 +- 18 files changed, 108 insertions(+), 133 deletions(-) diff --git a/cmd/pd-server/main.go b/cmd/pd-server/main.go index 4babe09f495..4b7181db4aa 100644 --- a/cmd/pd-server/main.go +++ b/cmd/pd-server/main.go @@ -18,6 +18,7 @@ import ( "context" "os" "os/signal" + "strings" "syscall" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" @@ -161,11 +162,15 @@ func addFlags(cmd *cobra.Command) { } func createServerWrapper(cmd *cobra.Command, args []string) { - isKeyspaceGroupEnabled := os.Getenv(serviceModeEnv) != "" - start(cmd, args, isKeyspaceGroupEnabled) + mode := os.Getenv(serviceModeEnv) + if len(mode) != 0 && strings.ToLower(mode) == "api" { + start(cmd, args, true) + } else { + start(cmd, args, false) + } } -func start(cmd *cobra.Command, args []string, isKeyspaceGroupEnabled bool) { +func start(cmd *cobra.Command, args []string, isMultiTimelinesEnabled bool) { schedulers.Register() cfg := config.NewConfig() flagSet := cmd.Flags() @@ -233,7 +238,7 @@ func start(cmd *cobra.Command, args []string, isKeyspaceGroupEnabled bool) { serviceBuilders = append(serviceBuilders, swaggerserver.NewHandler) } serviceBuilders = append(serviceBuilders, dashboard.GetServiceBuilders()...) - svr, err := server.CreateServer(ctx, cfg, isKeyspaceGroupEnabled, serviceBuilders...) + svr, err := server.CreateServer(ctx, cfg, isMultiTimelinesEnabled, serviceBuilders...) if err != nil { log.Fatal("create server failed", errs.ZapError(err)) } diff --git a/server/apiv2/handlers/micro_service.go b/server/apiv2/handlers/micro_service.go index 954f4f84222..940e9157c75 100644 --- a/server/apiv2/handlers/micro_service.go +++ b/server/apiv2/handlers/micro_service.go @@ -65,12 +65,8 @@ func GetMembers(c *gin.Context) { func GetPrimary(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) if service := c.Param("service"); len(service) > 0 { - addr, err := svr.GetServicePrimaryAddr(c.Request.Context(), service) - if err != nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) - return - } - if len(addr) == 0 { + addr, exist := svr.GetServicePrimaryAddr(c.Request.Context(), service) + if !exist { c.AbortWithStatusJSON(http.StatusNotFound, "no primary found") return } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 964702a105f..b8682a734d3 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -131,7 +131,7 @@ type Server interface { GetMembers() ([]*pdpb.Member, error) ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error GetKeyspaceGroupManager() *keyspace.GroupManager - IsKeyspaceGroupEnabled() bool + IsMultiTimelinesEnabled() bool GetSafePointV2Manager() *gc.SafePointV2Manager } @@ -156,12 +156,12 @@ type RaftCluster struct { etcdClient *clientv3.Client httpClient *http.Client - running bool - isKeyspaceGroupEnabled bool - meta *metapb.Cluster - storage storage.Storage - minResolvedTS atomic.Value // Store as uint64 - externalTS atomic.Value // Store as uint64 + running bool + isMultiTimelinesEnabled bool + meta *metapb.Cluster + storage storage.Storage + minResolvedTS atomic.Value // Store as uint64 + externalTS atomic.Value // Store as uint64 // Keep the previous store limit settings when removing a store. prevStoreLimit map[uint64]map[storelimit.Type]float64 @@ -325,7 +325,7 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) { log.Warn("raft cluster has already been started") return nil } - c.isKeyspaceGroupEnabled = s.IsKeyspaceGroupEnabled() + c.isMultiTimelinesEnabled = s.IsMultiTimelinesEnabled() err = c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager()) if err != nil { return err @@ -376,14 +376,13 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) { c.loadExternalTS() c.loadMinResolvedTS() - if c.isKeyspaceGroupEnabled { - // bootstrap keyspace group manager after starting other parts successfully. - // This order avoids a stuck goroutine in keyspaceGroupManager when it fails to create raftcluster. - err = c.keyspaceGroupManager.Bootstrap(c.ctx) - if err != nil { - return err - } + // bootstrap keyspace group manager after starting other parts successfully. + // This order avoids a stuck goroutine in keyspaceGroupManager when it fails to create raftcluster. + err = c.keyspaceGroupManager.Bootstrap(c.ctx) + if err != nil { + return err } + c.checkSchedulingService() c.wg.Add(9) go c.runServiceCheckJob() @@ -426,9 +425,9 @@ func (c *RaftCluster) checkSchedulingService() { // If the external TSO service is unavailable, it will switch to the internal TSO service. // // In serverless env, we don't allow dynamic switching. -// Whether we use the internal TSO service or the external TSO service is determined by the `isKeyspaceGroupEnabled`. +// Whether we use the internal TSO service or the external TSO service is determined by the `isMultiTimelinesEnabled`. func (c *RaftCluster) checkTSOService() { - if c.isKeyspaceGroupEnabled { + if c.isMultiTimelinesEnabled { return } if !c.opt.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() { diff --git a/server/server.go b/server/server.go index b011aae923f..cfb37cfa141 100644 --- a/server/server.go +++ b/server/server.go @@ -225,7 +225,7 @@ type Server struct { auditBackends []audit.Backend registry *registry.ServiceRegistry - isKeyspaceGroupEnabled bool + isMultiTimelinesEnabled bool servicePrimaryMap sync.Map /* Store as map[string]string */ tsoPrimaryWatcher *etcdutil.LoopWatcher schedulingPrimaryWatcher *etcdutil.LoopWatcher @@ -238,17 +238,17 @@ type Server struct { type HandlerBuilder func(context.Context, *Server) (http.Handler, apiutil.APIServiceGroup, error) // CreateServer creates the UNINITIALIZED pd server with given configuration. -func CreateServer(ctx context.Context, cfg *config.Config, services []string, legacyServiceBuilders ...HandlerBuilder) (*Server, error) { +func CreateServer(ctx context.Context, cfg *config.Config, isMultiTimelinesEnabled bool, legacyServiceBuilders ...HandlerBuilder) (*Server, error) { // TODO: Currently, whether we enable microservice or not is determined by the service list. // It's equal to whether we enable the keyspace group or not. - // But indeed the keyspace group is independent of the microservice. // There could be the following scenarios: // 1. Enable microservice but disable keyspace group. (non-serverless scenario) // 2. Enable microservice and enable keyspace group. (serverless scenario) // 3. Disable microservice and disable keyspace group. (both serverless scenario and non-serverless scenario) + // But for case 1, we enable keyspace group which is misleading because non-serverless don't have keyspace related concept. + // The keyspace group should be independent of the microservice. // We should separate the keyspace group from the microservice later. - isKeyspaceGroupEnabled := len(services) != 0 - log.Info("PD config", zap.Bool("enable-keyspace-group", isKeyspaceGroupEnabled), zap.Reflect("config", cfg)) + log.Info("PD config", zap.Bool("is-multi-timelines-enabled", isMultiTimelinesEnabled), zap.Reflect("config", cfg)) serviceMiddlewareCfg := config.NewServiceMiddlewareConfig() s := &Server{ @@ -260,7 +260,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, services []string, le ctx: ctx, startTimestamp: time.Now().Unix(), DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename), - isKeyspaceGroupEnabled: isKeyspaceGroupEnabled, + isMultiTimelinesEnabled: isMultiTimelinesEnabled, tsoClientPool: struct { syncutil.RWMutex clients map[string]tsopb.TSO_TsoClient @@ -479,9 +479,7 @@ func (s *Server) startServer(ctx context.Context) error { Member: s.member.MemberValue(), Step: keyspace.AllocStep, }) - if s.IsKeyspaceGroupEnabled() { - s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage, s.client) - } + s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage, s.client) s.keyspaceManager = keyspace.NewKeyspaceManager(s.ctx, s.storage, s.cluster, keyspaceIDAllocator, &s.cfg.Keyspace, s.keyspaceGroupManager) s.safePointV2Manager = gc.NewSafePointManagerV2(s.ctx, s.storage, s.storage, s.storage) s.hbStreams = hbstream.NewHeartbeatStreams(ctx, "", s.cluster) @@ -531,9 +529,7 @@ func (s *Server) Close() { s.cgMonitor.StopMonitor() s.stopServerLoop() - if s.IsKeyspaceGroupEnabled() { - s.keyspaceGroupManager.Close() - } + s.keyspaceGroupManager.Close() if s.client != nil { if err := s.client.Close(); err != nil { @@ -787,9 +783,9 @@ func (s *Server) stopRaftCluster() { s.cluster.Stop() } -// IsKeyspaceGroupEnabled returns whether the keyspace group is enabled. -func (s *Server) IsKeyspaceGroupEnabled() bool { - return s.isKeyspaceGroupEnabled +// IsMultiTimelinesEnabled returns whether the multi-timelines feature is enabled. +func (s *Server) IsMultiTimelinesEnabled() bool { + return s.isMultiTimelinesEnabled } // GetAddr returns the server urls for clients. @@ -1389,13 +1385,28 @@ func (s *Server) GetRaftCluster() *cluster.RaftCluster { // IsServiceIndependent returns whether the service is independent. func (s *Server) IsServiceIndependent(name string) bool { - if s.isKeyspaceGroupEnabled && !s.IsClosed() { - if name == constant.TSOServiceName && !s.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() { + if name == constant.TSOServiceName { + // TSO service is always independent when multi-timelines is enabled. + // Otherwise, it depends on the dynamic switching feature. + // Only serverless env, isMultiTimelinesEnabled is true. + if s.isMultiTimelinesEnabled { + return true + } + if !s.IsClosed() { + if s.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() { + // If the raft cluster is running, the service check is not executed. + // We return false temporarily. + if s.GetRaftCluster() == nil { + return false + } + return s.cluster.IsServiceIndependent(name) + } + // If the dynamic switching feature is disabled, we only use internal TSO service. return false } - return s.cluster.IsServiceIndependent(name) } - return false + // Other services relies on service discovery. + return s.cluster.IsServiceIndependent(name) } // DirectlyGetRaftCluster returns raft cluster directly. diff --git a/server/server_test.go b/server/server_test.go index 2ad677ad7e5..e44e9ff20c5 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -271,7 +271,7 @@ func TestMode(t *testing.T) { err = svr.Run() re.NoError(err) MustWaitLeader(re, []*Server{svr}) - re.True(svr.IsKeyspaceGroupEnabled()) + re.True(svr.IsMultiTimelinesEnabled()) } func TestIsPathInDirectory(t *testing.T) { diff --git a/tests/cluster.go b/tests/cluster.go index d1bd7fbba48..34ef7749a08 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -36,7 +36,6 @@ import ( "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/keyspace" scheduling "github.com/tikv/pd/pkg/mcs/scheduling/server" - "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/swaggerserver" "github.com/tikv/pd/pkg/tso" @@ -79,7 +78,7 @@ type TestServer struct { var zapLogOnce sync.Once // NewTestServer creates a new TestServer. -func NewTestServer(ctx context.Context, cfg *config.Config, isKeyspaceGroupEnabled ...bool) (*TestServer, error) { +func NewTestServer(ctx context.Context, cfg *config.Config) (*TestServer, error) { // disable the heartbeat async runner in test cfg.Schedule.EnableHeartbeatConcurrentRunner = false err := logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) @@ -98,11 +97,7 @@ func NewTestServer(ctx context.Context, cfg *config.Config, isKeyspaceGroupEnabl serviceBuilders = append(serviceBuilders, swaggerserver.NewHandler) } serviceBuilders = append(serviceBuilders, dashboard.GetServiceBuilders()...) - var enableKeyspaceGroup bool - if len(isKeyspaceGroupEnabled) > 0 { - enableKeyspaceGroup = isKeyspaceGroupEnabled[0] - } - svr, err := server.CreateServer(ctx, cfg, enableKeyspaceGroup, serviceBuilders...) + svr, err := server.CreateServer(ctx, cfg, false, serviceBuilders...) if err != nil { return nil, err } @@ -430,15 +425,6 @@ type ConfigOption func(conf *config.Config, serverName string) // NewTestCluster creates a new TestCluster. func NewTestCluster(ctx context.Context, initialServerCount int, opts ...ConfigOption) (*TestCluster, error) { - return createTestCluster(ctx, initialServerCount, false, opts...) -} - -// NewTestClusterWithKeyspaceGroup creates a new TestCluster with keyspace group enabled. -func NewTestClusterWithKeyspaceGroup(ctx context.Context, initialServerCount int, opts ...ConfigOption) (*TestCluster, error) { - return createTestCluster(ctx, initialServerCount, true, opts...) -} - -func createTestCluster(ctx context.Context, initialServerCount int, isKeyspaceGroupEnabled bool, opts ...ConfigOption) (*TestCluster, error) { schedulers.Register() config := newClusterConfig(initialServerCount) servers := make(map[string]*TestServer) @@ -447,7 +433,7 @@ func createTestCluster(ctx context.Context, initialServerCount int, isKeyspaceGr if err != nil { return nil, err } - s, err := NewTestServer(ctx, serverConf, isKeyspaceGroupEnabled) + s, err := NewTestServer(ctx, serverConf) if err != nil { return nil, err } @@ -467,11 +453,11 @@ func createTestCluster(ctx context.Context, initialServerCount int, isKeyspaceGr // RestartTestPDCluster restarts the PD test cluster. func RestartTestPDCluster(ctx context.Context, cluster *TestCluster) (*TestCluster, error) { - return restartTestCluster(ctx, cluster, true) + return restartTestCluster(ctx, cluster) } func restartTestCluster( - ctx context.Context, cluster *TestCluster, isKeyspaceGroupEnabled bool, + ctx context.Context, cluster *TestCluster, ) (newTestCluster *TestCluster, err error) { schedulers.Register() newTestCluster = &TestCluster{ @@ -498,11 +484,7 @@ func restartTestCluster( newServer *TestServer serverErr error ) - if isKeyspaceGroupEnabled { - newServer, serverErr = NewTestServer(ctx, serverCfg, []string{constant.PDServiceName}) - } else { - newServer, serverErr = NewTestServer(ctx, serverCfg, nil) - } + newServer, serverErr = NewTestServer(ctx, serverCfg) serverMap.Store(serverName, newServer) errorMap.Store(serverName, serverErr) }(serverName, server) @@ -733,20 +715,6 @@ func (c *TestCluster) Join(ctx context.Context, opts ...ConfigOption) (*TestServ return s, nil } -// JoinWithKeyspaceGroup is used to add a new TestServer into the cluster with keyspace group enabled. -func (c *TestCluster) JoinWithKeyspaceGroup(ctx context.Context, opts ...ConfigOption) (*TestServer, error) { - conf, err := c.config.join().Generate(opts...) - if err != nil { - return nil, err - } - s, err := NewTestServer(ctx, conf, true) - if err != nil { - return nil, err - } - c.servers[conf.Name] = s - return s, nil -} - // Destroy is used to destroy a TestCluster. func (c *TestCluster) Destroy() { for _, s := range c.servers { diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 6bd25567f81..3c0b9f366fa 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -361,7 +361,7 @@ func TestTSOFollowerProxyWithTSOService(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/client/servicediscovery/fastUpdateServiceMode", `return(true)`)) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1) re.NoError(err) defer cluster.Destroy() err = cluster.RunInitialServers() diff --git a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go index 4644b2131d1..117c036fba6 100644 --- a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go +++ b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go @@ -60,7 +60,7 @@ func (suite *keyspaceGroupTestSuite) SetupTest() { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) ctx, cancel := context.WithCancel(context.Background()) suite.ctx = ctx - cluster, err := tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1) + cluster, err := tests.NewTestCluster(suite.ctx, 1) suite.cluster = cluster re.NoError(err) re.NoError(cluster.RunInitialServers()) diff --git a/tests/integrations/mcs/members/member_test.go b/tests/integrations/mcs/members/member_test.go index 7da0bf652bb..61cf885e0c2 100644 --- a/tests/integrations/mcs/members/member_test.go +++ b/tests/integrations/mcs/members/member_test.go @@ -64,7 +64,7 @@ func (suite *memberTestSuite) SetupTest() { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) ctx, cancel := context.WithCancel(context.Background()) suite.ctx = ctx - cluster, err := tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1) + cluster, err := tests.NewTestCluster(suite.ctx, 1) suite.cluster = cluster re.NoError(err) re.NoError(cluster.RunInitialServers()) diff --git a/tests/integrations/mcs/tso/api_test.go b/tests/integrations/mcs/tso/api_test.go index fbfe52d6d68..dcc9137e436 100644 --- a/tests/integrations/mcs/tso/api_test.go +++ b/tests/integrations/mcs/tso/api_test.go @@ -62,7 +62,7 @@ func (suite *tsoAPITestSuite) SetupTest() { var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.pdCluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1) + suite.pdCluster, err = tests.NewTestCluster(suite.ctx, 1) re.NoError(err) err = suite.pdCluster.RunInitialServers() re.NoError(err) @@ -137,7 +137,7 @@ func TestTSOServerStartFirst(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pdCluster, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) { + pdCluster, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = []string{"k1", "k2"} }) defer pdCluster.Destroy() @@ -200,7 +200,7 @@ func TestForwardOnlyTSONoScheduling(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 1) + tc, err := tests.NewTestCluster(ctx, 1) defer tc.Destroy() re.NoError(err) err = tc.RunInitialServers() diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 51ace0f08c4..400450ad1ae 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -82,7 +82,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) SetupSuite() { var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1) + suite.cluster, err = tests.NewTestCluster(suite.ctx, 1) re.NoError(err) err = suite.cluster.RunInitialServers() re.NoError(err) @@ -538,7 +538,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller", `return(true)`)) // Init PD config but not start. - tc, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) { + tc, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = []string{ "keyspace_a", "keyspace_b", } @@ -735,7 +735,7 @@ func TestGetTSOImmediately(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller", `return(true)`)) // Init PD config but not start. - tc, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) { + tc, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = []string{ "keyspace_a", "keyspace_b", } diff --git a/tests/integrations/mcs/tso/proxy_test.go b/tests/integrations/mcs/tso/proxy_test.go index 152122fe232..4b8c7fdba20 100644 --- a/tests/integrations/mcs/tso/proxy_test.go +++ b/tests/integrations/mcs/tso/proxy_test.go @@ -62,7 +62,7 @@ func (s *tsoProxyTestSuite) SetupSuite() { var err error s.ctx, s.cancel = context.WithCancel(context.Background()) // Create an PD cluster with 1 server - s.pdCluster, err = tests.NewTestClusterWithKeyspaceGroup(s.ctx, 1) + s.pdCluster, err = tests.NewTestCluster(s.ctx, 1) re.NoError(err) err = s.pdCluster.RunInitialServers() re.NoError(err) diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index 24141ded4f0..427294e277a 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -75,7 +75,7 @@ func (suite *tsoServerTestSuite) SetupSuite() { re := suite.Require() suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1) + suite.cluster, err = tests.NewTestCluster(suite.ctx, 1) re.NoError(err) err = suite.cluster.RunInitialServers() @@ -156,19 +156,19 @@ func (suite *tsoServerTestSuite) TestParticipantStartWithAdvertiseListenAddr() { func TestTSOPath(t *testing.T) { re := require.New(t) - checkTSOPath(re, true /*isKeyspaceGroupEnabled*/) - checkTSOPath(re, false /*isKeyspaceGroupEnabled*/) + checkTSOPath(re, true /*isTSODynamicSwitchingEnabled*/) + checkTSOPath(re, false /*isTSODynamicSwitchingEnabled*/) } -func checkTSOPath(re *require.Assertions, isKeyspaceGroupEnabled bool) { +func checkTSOPath(re *require.Assertions, isTSODynamicSwitchingEnabled bool) { var ( cluster *tests.TestCluster err error ) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - if isKeyspaceGroupEnabled { - cluster, err = tests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) { + if isTSODynamicSwitchingEnabled { + cluster, err = tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Microservice.EnableTSODynamicSwitching = false }) } else { @@ -184,7 +184,7 @@ func checkTSOPath(re *require.Assertions, isKeyspaceGroupEnabled bool) { re.NoError(pdLeader.BootstrapCluster()) backendEndpoints := pdLeader.GetAddr() client := pdLeader.GetEtcdClient() - if isKeyspaceGroupEnabled { + if isTSODynamicSwitchingEnabled { re.Equal(0, getEtcdTimestampKeyNum(re, client)) } else { re.Equal(1, getEtcdTimestampKeyNum(re, client)) @@ -227,17 +227,13 @@ type pdForward struct { pdClient pd.Client } -func NewPDForward(re *require.Assertions, isKeyspaceGroupEnabled bool) pdForward { +func NewPDForward(re *require.Assertions) pdForward { suite := pdForward{ re: re, } var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) - if isKeyspaceGroupEnabled { - suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 3) - } else { - suite.cluster, err = tests.NewTestCluster(suite.ctx, 3) - } + suite.cluster, err = tests.NewTestCluster(suite.ctx, 3) re.NoError(err) err = suite.cluster.RunInitialServers() @@ -277,7 +273,7 @@ func (suite *pdForward) ShutDown() { func TestForwardTSO(t *testing.T) { re := require.New(t) - suite := NewPDForward(re, false) + suite := NewPDForward(re) defer suite.ShutDown() // If EnableTSODynamicSwitching is false, the tso server will be provided by PD. // The tso server won't affect the PD. @@ -296,9 +292,9 @@ func TestForwardTSO(t *testing.T) { suite.checkAvailableTSO(re) } -func TestForwardTSOWithKeyspaceGroupEnabled(t *testing.T) { +func TestForwardTSOWithKeyspaceGroup(t *testing.T) { re := require.New(t) - suite := NewPDForward(re, true) + suite := NewPDForward(re) defer suite.ShutDown() // Unable to use the tso-related interface without tso server suite.checkUnavailableTSO(re) @@ -311,7 +307,7 @@ func TestForwardTSOWithKeyspaceGroupEnabled(t *testing.T) { func TestForwardTSOWhenPrimaryChanged(t *testing.T) { re := require.New(t) - suite := NewPDForward(re, true) + suite := NewPDForward(re) defer suite.ShutDown() tc, err := tests.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) @@ -351,7 +347,7 @@ func TestForwardTSOWhenPrimaryChanged(t *testing.T) { func TestResignTSOPrimaryForward(t *testing.T) { re := require.New(t) - suite := NewPDForward(re, true) + suite := NewPDForward(re) defer suite.ShutDown() // TODO: test random kill primary with 3 nodes tc, err := tests.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) @@ -377,7 +373,7 @@ func TestResignTSOPrimaryForward(t *testing.T) { func TestResignAPIPrimaryForward(t *testing.T) { re := require.New(t) - suite := NewPDForward(re, true) + suite := NewPDForward(re) defer suite.ShutDown() tc, err := tests.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) @@ -401,7 +397,7 @@ func TestResignAPIPrimaryForward(t *testing.T) { func TestForwardTSOUnexpectedToFollower1(t *testing.T) { re := require.New(t) - suite := NewPDForward(re, true) + suite := NewPDForward(re) defer suite.ShutDown() suite.checkForwardTSOUnexpectedToFollower(func() { // unary call will retry internally @@ -414,7 +410,7 @@ func TestForwardTSOUnexpectedToFollower1(t *testing.T) { func TestForwardTSOUnexpectedToFollower2(t *testing.T) { re := require.New(t) - suite := NewPDForward(re, true) + suite := NewPDForward(re) defer suite.ShutDown() suite.checkForwardTSOUnexpectedToFollower(func() { // unary call will retry internally @@ -428,7 +424,7 @@ func TestForwardTSOUnexpectedToFollower2(t *testing.T) { func TestForwardTSOUnexpectedToFollower3(t *testing.T) { re := require.New(t) - suite := NewPDForward(re, true) + suite := NewPDForward(re) defer suite.ShutDown() suite.checkForwardTSOUnexpectedToFollower(func() { _, _, err := suite.pdClient.GetTS(suite.ctx) @@ -533,7 +529,7 @@ func (suite *CommonTestSuite) SetupSuite() { var err error re := suite.Require() suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1) + suite.cluster, err = tests.NewTestCluster(suite.ctx, 1) re.NoError(err) err = suite.cluster.RunInitialServers() @@ -597,7 +593,7 @@ func (suite *CommonTestSuite) TestBootstrapDefaultKeyspaceGroup() { } check() - s, err := suite.cluster.JoinWithKeyspaceGroup(suite.ctx) + s, err := suite.cluster.Join(suite.ctx) re.NoError(err) re.NoError(s.Run()) @@ -619,7 +615,7 @@ func TestTSOServiceSwitch(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 1, + tc, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Microservice.EnableTSODynamicSwitching = true }, diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index f83defd5a62..ed2d14bd579 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -98,7 +98,7 @@ func (suite *tsoClientTestSuite) SetupSuite() { if suite.legacy { suite.cluster, err = tests.NewTestCluster(suite.ctx, serverCount) } else { - suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, serverCount, func(conf *config.Config, _ string) { + suite.cluster, err = tests.NewTestCluster(suite.ctx, serverCount, func(conf *config.Config, _ string) { conf.Microservice.EnableTSODynamicSwitching = false }) } @@ -544,7 +544,7 @@ func TestUpgradingPDAndTSOClusters(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) // Create an PD cluster which has 3 servers - pdCluster, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 3) + pdCluster, err := tests.NewTestCluster(ctx, 3) re.NoError(err) err = pdCluster.RunInitialServers() re.NoError(err) diff --git a/tests/server/apiv2/handlers/tso_keyspace_group_test.go b/tests/server/apiv2/handlers/tso_keyspace_group_test.go index 17f326f4a4d..1821f63d0f2 100644 --- a/tests/server/apiv2/handlers/tso_keyspace_group_test.go +++ b/tests/server/apiv2/handlers/tso_keyspace_group_test.go @@ -42,7 +42,7 @@ func TestKeyspaceGroupTestSuite(t *testing.T) { func (suite *keyspaceGroupTestSuite) SetupTest() { re := suite.Require() suite.ctx, suite.cancel = context.WithCancel(context.Background()) - cluster, err := tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1) + cluster, err := tests.NewTestCluster(suite.ctx, 1) suite.cluster = cluster re.NoError(err) re.NoError(cluster.RunInitialServers()) diff --git a/tests/testutil.go b/tests/testutil.go index 406e09345b9..20b325290cb 100644 --- a/tests/testutil.go +++ b/tests/testutil.go @@ -380,7 +380,7 @@ func (s *SchedulingTestEnvironment) startCluster(m Env) { re.NoError(leaderServer.BootstrapCluster()) s.clusters[NonMicroserviceEnv] = cluster case MicroserviceEnv: - cluster, err := NewTestClusterWithKeyspaceGroup(ctx, 1, s.opts...) + cluster, err := NewTestCluster(ctx, 1, s.opts...) re.NoError(err) err = cluster.RunInitialServers() re.NoError(err) diff --git a/tools/pd-ctl/tests/keyspace/keyspace_group_test.go b/tools/pd-ctl/tests/keyspace/keyspace_group_test.go index 88ee782ab4b..221eaf66d55 100644 --- a/tools/pd-ctl/tests/keyspace/keyspace_group_test.go +++ b/tools/pd-ctl/tests/keyspace/keyspace_group_test.go @@ -41,7 +41,7 @@ func TestKeyspaceGroup(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 1) + tc, err := pdTests.NewTestCluster(ctx, 1) re.NoError(err) defer tc.Destroy() err = tc.RunInitialServers() @@ -102,7 +102,7 @@ func TestSplitKeyspaceGroup(t *testing.T) { for i := range 129 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 3, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestCluster(ctx, 3, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -157,7 +157,7 @@ func TestExternalAllocNodeWhenStart(t *testing.T) { for i := range 10 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -197,7 +197,7 @@ func TestSetNodeAndPriorityKeyspaceGroup(t *testing.T) { for i := range 10 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 3, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestCluster(ctx, 3, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -301,7 +301,7 @@ func TestMergeKeyspaceGroup(t *testing.T) { for i := range 129 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -420,7 +420,7 @@ func TestKeyspaceGroupState(t *testing.T) { for i := range 10 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -511,7 +511,7 @@ func TestShowKeyspaceGroupPrimary(t *testing.T) { for i := range 10 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) diff --git a/tools/pd-ctl/tests/keyspace/keyspace_test.go b/tools/pd-ctl/tests/keyspace/keyspace_test.go index e6c28de599f..131afacf1db 100644 --- a/tools/pd-ctl/tests/keyspace/keyspace_test.go +++ b/tools/pd-ctl/tests/keyspace/keyspace_test.go @@ -49,7 +49,7 @@ func TestKeyspace(t *testing.T) { for i := 1; i < 10; i++ { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 3, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestCluster(ctx, 3, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -155,7 +155,7 @@ func (suite *keyspaceTestSuite) SetupTest() { suite.ctx, suite.cancel = context.WithCancel(context.Background()) re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion", "return(true)")) - tc, err := pdTests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1) + tc, err := pdTests.NewTestCluster(suite.ctx, 1) re.NoError(err) re.NoError(tc.RunInitialServers()) tc.WaitLeader()