Skip to content

Commit

Permalink
client: support dynamic start/stop of the router client (#9082)
Browse files Browse the repository at this point in the history
ref #8690

Support dynamic start and stop of the router client.

Signed-off-by: JmPotato <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
JmPotato and ti-chi-bot[bot] authored Feb 19, 2025
1 parent b757cbe commit 2bbeb9c
Show file tree
Hide file tree
Showing 6 changed files with 555 additions and 290 deletions.
12 changes: 6 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,12 @@ func (c *client) UpdateOption(option opt.DynamicOption, value any) error {
return errors.New("[pd] invalid value type for TSOClientRPCConcurrency option, it should be int")
}
c.inner.option.SetTSOClientRPCConcurrency(value)
case opt.EnableRouterClient:
enable, ok := value.(bool)
if !ok {
return errors.New("[pd] invalid value type for EnableRouterClient option, it should be bool")
}
c.inner.option.SetEnableRouterClient(enable)
default:
return errors.New("[pd] unsupported client option")
}
Expand Down Expand Up @@ -569,12 +575,6 @@ func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, e
return minTS.Physical, minTS.Logical, nil
}

// EnableRouterClient enables the router client.
// This is only for test currently.
func (c *client) EnableRouterClient() {
c.inner.initRouterClient()
}

func (c *client) getRouterClient() *router.Cli {
c.inner.RLock()
defer c.inner.RUnlock()
Expand Down
60 changes: 57 additions & 3 deletions client/inner_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,69 @@ func (c *innerClient) init(updateKeyspaceIDCb sd.UpdateKeyspaceIDFunc) error {
return err
}

// Check if the router client has been enabled.
if c.option.GetEnableRouterClient() {
c.enableRouterClient()
}
c.wg.Add(1)
go c.routerClientInitializer()

return nil
}

func (c *innerClient) initRouterClient() {
func (c *innerClient) routerClientInitializer() {
log.Info("[pd] start router client initializer")
defer c.wg.Done()
for {
select {
case <-c.ctx.Done():
log.Info("[pd] exit router client initializer")
return
case <-c.option.EnableRouterClientCh:
if c.option.GetEnableRouterClient() {
log.Info("[pd] notified to enable the router client")
c.enableRouterClient()
} else {
log.Info("[pd] notified to disable the router client")
c.disableRouterClient()
}
}
}
}

func (c *innerClient) enableRouterClient() {
// Check if the router client has been enabled.
c.RLock()
if c.routerClient != nil {
c.RUnlock()
return
}
c.RUnlock()
// Create a new router client first before acquiring the lock.
routerClient := router.NewClient(c.ctx, c.serviceDiscovery, c.option)
c.Lock()
defer c.Unlock()
// Double check if the router client has been enabled.
if c.routerClient != nil {
// Release the lock and close the router client.
c.Unlock()
routerClient.Close()
return
}
c.routerClient = router.NewClient(c.ctx, c.serviceDiscovery, c.option)
c.routerClient = routerClient
c.Unlock()
}

func (c *innerClient) disableRouterClient() {
c.Lock()
if c.routerClient == nil {
c.Unlock()
return
}
routerClient := c.routerClient
c.routerClient = nil
c.Unlock()
// Close the router client after the lock is released.
routerClient.Close()
}

func (c *innerClient) setServiceMode(newMode pdpb.ServiceMode) {
Expand Down Expand Up @@ -214,6 +267,7 @@ func (c *innerClient) setup() error {

// Create dispatchers
c.createTokenDispatcher()

return nil
}

Expand Down
48 changes: 33 additions & 15 deletions client/opt/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
defaultEnableTSOFollowerProxy = false
defaultEnableFollowerHandle = false
defaultTSOClientRPCConcurrency = 1
defaultEnableRouterClient = false
)

// DynamicOption is used to distinguish the dynamic option type.
Expand All @@ -49,6 +50,9 @@ const (
EnableFollowerHandle
// TSOClientRPCConcurrency controls the amount of ongoing TSO RPC requests at the same time in a single TSO client.
TSOClientRPCConcurrency
// EnableRouterClient is the router client option.
// It is stored as bool.
EnableRouterClient

dynamicOptionCount
)
Expand All @@ -70,6 +74,7 @@ type Option struct {
dynamicOptions [dynamicOptionCount]atomic.Value

EnableTSOFollowerProxyCh chan struct{}
EnableRouterClientCh chan struct{}
}

// NewOption creates a new PD client option with the default values set.
Expand All @@ -78,13 +83,15 @@ func NewOption() *Option {
Timeout: defaultPDTimeout,
MaxRetryTimes: maxInitClusterRetries,
EnableTSOFollowerProxyCh: make(chan struct{}, 1),
EnableRouterClientCh: make(chan struct{}, 1),
InitMetrics: true,
}

co.dynamicOptions[MaxTSOBatchWaitInterval].Store(defaultMaxTSOBatchWaitInterval)
co.dynamicOptions[EnableTSOFollowerProxy].Store(defaultEnableTSOFollowerProxy)
co.dynamicOptions[EnableFollowerHandle].Store(defaultEnableFollowerHandle)
co.dynamicOptions[TSOClientRPCConcurrency].Store(defaultTSOClientRPCConcurrency)
co.dynamicOptions[EnableRouterClient].Store(defaultEnableRouterClient)
return co
}

Expand All @@ -94,19 +101,13 @@ func (o *Option) SetMaxTSOBatchWaitInterval(interval time.Duration) error {
if interval < 0 || interval > 10*time.Millisecond {
return errors.New("[pd] invalid max TSO batch wait interval, should be between 0 and 10ms")
}
old := o.GetMaxTSOBatchWaitInterval()
if interval != old {
o.dynamicOptions[MaxTSOBatchWaitInterval].Store(interval)
}
o.dynamicOptions[MaxTSOBatchWaitInterval].CompareAndSwap(o.GetMaxTSOBatchWaitInterval(), interval)
return nil
}

// SetEnableFollowerHandle set the Follower Handle option.
func (o *Option) SetEnableFollowerHandle(enable bool) {
old := o.GetEnableFollowerHandle()
if enable != old {
o.dynamicOptions[EnableFollowerHandle].Store(enable)
}
o.dynamicOptions[EnableFollowerHandle].CompareAndSwap(!enable, enable)
}

// GetEnableFollowerHandle gets the Follower Handle enable option.
Expand All @@ -121,9 +122,7 @@ func (o *Option) GetMaxTSOBatchWaitInterval() time.Duration {

// SetEnableTSOFollowerProxy sets the TSO Follower Proxy option.
func (o *Option) SetEnableTSOFollowerProxy(enable bool) {
old := o.GetEnableTSOFollowerProxy()
if enable != old {
o.dynamicOptions[EnableTSOFollowerProxy].Store(enable)
if o.dynamicOptions[EnableTSOFollowerProxy].CompareAndSwap(!enable, enable) {
select {
case o.EnableTSOFollowerProxyCh <- struct{}{}:
default:
Expand All @@ -138,17 +137,29 @@ func (o *Option) GetEnableTSOFollowerProxy() bool {

// SetTSOClientRPCConcurrency sets the TSO client RPC concurrency option.
func (o *Option) SetTSOClientRPCConcurrency(value int) {
old := o.GetTSOClientRPCConcurrency()
if value != old {
o.dynamicOptions[TSOClientRPCConcurrency].Store(value)
}
o.dynamicOptions[TSOClientRPCConcurrency].CompareAndSwap(o.GetTSOClientRPCConcurrency(), value)
}

// GetTSOClientRPCConcurrency gets the TSO client RPC concurrency option.
func (o *Option) GetTSOClientRPCConcurrency() int {
return o.dynamicOptions[TSOClientRPCConcurrency].Load().(int)
}

// SetEnableRouterClient sets the router client option.
func (o *Option) SetEnableRouterClient(enable bool) {
if o.dynamicOptions[EnableRouterClient].CompareAndSwap(!enable, enable) {
select {
case o.EnableRouterClientCh <- struct{}{}:
default:
}
}
}

// GetEnableRouterClient gets the router client option.
func (o *Option) GetEnableRouterClient() bool {
return o.dynamicOptions[EnableRouterClient].Load().(bool)
}

// ClientOption configures client.
type ClientOption func(*Option)

Expand Down Expand Up @@ -210,6 +221,13 @@ func WithBackoffer(bo *retry.Backoffer) ClientOption {
}
}

// WithEnableRouterClient configures the client with router client option.
func WithEnableRouterClient(enable bool) ClientOption {
return func(op *Option) {
op.SetEnableRouterClient(enable)
}
}

// GetStoreOp represents available options when getting stores.
type GetStoreOp struct {
ExcludeTombstone bool
Expand Down
123 changes: 91 additions & 32 deletions client/opt/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,42 +26,101 @@ import (
func TestDynamicOptionChange(t *testing.T) {
re := require.New(t)
o := NewOption()
// Check the default value setting.
re.Equal(defaultMaxTSOBatchWaitInterval, o.GetMaxTSOBatchWaitInterval())
re.Equal(defaultEnableTSOFollowerProxy, o.GetEnableTSOFollowerProxy())
re.Equal(defaultEnableFollowerHandle, o.GetEnableFollowerHandle())

// Check the invalid value setting.
re.Error(o.SetMaxTSOBatchWaitInterval(time.Second))
re.Equal(defaultMaxTSOBatchWaitInterval, o.GetMaxTSOBatchWaitInterval())
expectInterval := time.Millisecond
o.SetMaxTSOBatchWaitInterval(expectInterval)
re.Equal(expectInterval, o.GetMaxTSOBatchWaitInterval())
expectInterval = time.Duration(float64(time.Millisecond) * 0.5)
o.SetMaxTSOBatchWaitInterval(expectInterval)
re.Equal(expectInterval, o.GetMaxTSOBatchWaitInterval())
expectInterval = time.Duration(float64(time.Millisecond) * 1.5)
o.SetMaxTSOBatchWaitInterval(expectInterval)
re.Equal(expectInterval, o.GetMaxTSOBatchWaitInterval())

expectBool := true
o.SetEnableTSOFollowerProxy(expectBool)
// Check the value changing notification.
testutil.Eventually(re, func() bool {
<-o.EnableTSOFollowerProxyCh
return true
})
re.Equal(expectBool, o.GetEnableTSOFollowerProxy())
// Check whether any data will be sent to the channel.
// It will panic if the test fails.
close(o.EnableTSOFollowerProxyCh)
// Setting the same value should not notify the channel.

// Test default values.
re.Equal(defaultMaxTSOBatchWaitInterval, o.GetMaxTSOBatchWaitInterval(), "default max TSO batch wait interval")
re.Equal(defaultEnableTSOFollowerProxy, o.GetEnableTSOFollowerProxy(), "default enable TSO follower proxy")
re.Equal(defaultEnableFollowerHandle, o.GetEnableFollowerHandle(), "default enable follower handle")
re.Equal(defaultTSOClientRPCConcurrency, o.GetTSOClientRPCConcurrency(), "default TSO client RPC concurrency")
re.Equal(defaultEnableRouterClient, o.GetEnableRouterClient(), "default enable router client")

// Test invalid setting.
err := o.SetMaxTSOBatchWaitInterval(time.Second)
re.Error(err, "expect error for invalid high interval")
// Value remains unchanged.
re.Equal(defaultMaxTSOBatchWaitInterval, o.GetMaxTSOBatchWaitInterval(), "max TSO batch wait interval should not change to an invalid value")

// Define a list of valid intervals.
validIntervals := []time.Duration{
time.Millisecond,
time.Duration(float64(time.Millisecond) * 0.5),
time.Duration(float64(time.Millisecond) * 1.5),
10 * time.Millisecond,
0,
}
for _, interval := range validIntervals {
// Use a subtest for each valid interval.
err := o.SetMaxTSOBatchWaitInterval(interval)
re.NoError(err, "expected interval %v to be set without error", interval)
re.Equal(interval, o.GetMaxTSOBatchWaitInterval(), "max TSO batch wait interval should be updated to %v", interval)
}

clearChannel(o.EnableTSOFollowerProxyCh)

// Testing that the setting is effective and a notification is sent.
var expectBool bool
for _, expectBool = range []bool{true, false} {
o.SetEnableTSOFollowerProxy(expectBool)
testutil.Eventually(re, func() bool {
select {
case <-o.EnableTSOFollowerProxyCh:
default:
return false
}
return o.GetEnableTSOFollowerProxy() == expectBool
})
}

// Testing that setting the same value should not trigger a notification.
o.SetEnableTSOFollowerProxy(expectBool)
ensureNoNotification(t, o.EnableTSOFollowerProxyCh)

// This option does not use a notification channel.
expectBool = true
o.SetEnableFollowerHandle(expectBool)
re.Equal(expectBool, o.GetEnableFollowerHandle())
re.Equal(expectBool, o.GetEnableFollowerHandle(), "EnableFollowerHandle should be set to true")
expectBool = false
o.SetEnableFollowerHandle(expectBool)
re.Equal(expectBool, o.GetEnableFollowerHandle())
re.Equal(expectBool, o.GetEnableFollowerHandle(), "EnableFollowerHandle should be set to false")

expectInt := 10
o.SetTSOClientRPCConcurrency(expectInt)
re.Equal(expectInt, o.GetTSOClientRPCConcurrency(), "TSOClientRPCConcurrency should update accordingly")

clearChannel(o.EnableRouterClientCh)

// Testing that the setting is effective and a notification is sent.
for _, expectBool = range []bool{true, false} {
o.SetEnableRouterClient(expectBool)
testutil.Eventually(re, func() bool {
select {
case <-o.EnableRouterClientCh:
default:
return false
}
return o.GetEnableRouterClient() == expectBool
})
}

// Testing that setting the same value should not trigger a notification.
o.SetEnableRouterClient(expectBool)
ensureNoNotification(t, o.EnableRouterClientCh)
}

// clearChannel drains any pending events from the channel.
func clearChannel(ch chan struct{}) {
select {
case <-ch:
default:
}
}

// ensureNoNotification checks that no notification is sent on the channel within a short timeout.
func ensureNoNotification(t *testing.T, ch chan struct{}) {
select {
case v := <-ch:
t.Fatalf("unexpected notification received: %v", v)
case <-time.After(100 * time.Millisecond):
// No notification received as expected.
}
}
Loading

0 comments on commit 2bbeb9c

Please sign in to comment.