From c28afcc297e6cd64819242756bde568519600d81 Mon Sep 17 00:00:00 2001 From: kevburnsjr Date: Sun, 5 Jan 2025 07:35:45 -0800 Subject: [PATCH] Shard Client option WriteToLeader --- agent.go | 13 +++++++++++-- agent_integration_test.go | 19 ++++++++++++------- fsm.go | 11 +++++++++++ fsm_cmd.go | 12 ++++++++++++ shard_client.go | 17 ++++++++++++++--- shard_client_manager.go | 17 ++++++++++++++++- shard_client_option.go | 7 +++++++ shard_controller_manager.go | 33 ++++++++++++++++----------------- types.go | 5 +++-- 9 files changed, 102 insertions(+), 32 deletions(-) diff --git a/agent.go b/agent.go index 28d0cb6..78b53f6 100644 --- a/agent.go +++ b/agent.go @@ -92,8 +92,8 @@ func NewAgent(clusterName string, peers []string, opts ...AgentOption) (a *Agent // Client returns a client for a specific shard. // It will send writes to the nearest member and send reads to the nearest replica (by ping). -func (a *Agent) Client(shardID uint64) (c ShardClient) { - c, _ = newClient(a.clientManager, shardID) +func (a *Agent) Client(shardID uint64, opts ...ClientOption) (c ShardClient) { + c, _ = newClient(a.clientManager, shardID, opts...) return } @@ -425,6 +425,15 @@ func (a *Agent) replicaDelete(replicaID uint64) (err error) { return } +// shardLeaderSet sets the leader of a shard +func (a *Agent) shardLeaderSet(shardID, replicaID uint64) (err error) { + _, err = a.primePropose(newCmdShardLeaderSet(shardID, replicaID)) + if err == nil { + a.log.Infof("Shard %05d leader set to %05d", shardID, replicaID) + } + return +} + func (a *Agent) setStatus(s AgentStatus) { a.mutex.Lock() defer a.mutex.Unlock() diff --git a/agent_integration_test.go b/agent_integration_test.go index c838c66..20db273 100644 --- a/agent_integration_test.go +++ b/agent_integration_test.go @@ -148,12 +148,14 @@ func TestAgent(t *testing.T) { } for _, op := range []string{"update", "query", "watch"} { for _, linearity := range []string{"linear", "non-linear"} { - t.Run(fmt.Sprintf(`%s %s %s host client`, sm, op, linearity), func(t *testing.T) { - runAgentSubTest(t, agents, shard, op, linearity != "linear") - }) - t.Run(fmt.Sprintf(`%s %s %s shard client`, sm, op, linearity), func(t *testing.T) { - runAgentSubTestByShard(t, agents, shard, op, linearity != "linear") - }) + for _, writeTarget := range []string{"all", "leader"} { + t.Run(fmt.Sprintf(`%s %s %s host client`, sm, op, linearity), func(t *testing.T) { + runAgentSubTest(t, agents, shard, op, linearity != "linear") + }) + t.Run(fmt.Sprintf(`%s %s %s shard client`, sm, op, linearity), func(t *testing.T) { + runAgentSubTestByShard(t, agents, shard, op, linearity != "linear", writeTarget == "leader") + }) + } } } } @@ -338,13 +340,16 @@ func runAgentSubTest(t *testing.T, agents []*Agent, shard Shard, op string, stal } } -func runAgentSubTestByShard(t *testing.T, agents []*Agent, shard Shard, op string, stale bool) { +func runAgentSubTestByShard(t *testing.T, agents []*Agent, shard Shard, op string, stale, writeToleader bool) { var i = 0 var err error var val uint64 for _, a := range agents { val = 0 client := a.Client(shard.ID) + if writeToleader { + client = a.Client(shard.ID, WithWriteToLeader()) + } require.NotNil(t, client) if op == "update" && stale { err = client.Commit(raftCtx(), bytes.Repeat([]byte("test"), i+1)) diff --git a/fsm.go b/fsm.go index ef7589a..decd70d 100644 --- a/fsm.go +++ b/fsm.go @@ -165,6 +165,17 @@ func (fsm *fsm) Update(entry Entry) (Result, error) { return true }) entry.Result.Value = 1 + // Set Leader + case command_action_leader_set: + shard, ok := state.Shard(cmd.Shard.ID) + if !ok { + fsm.log.Warningf("%v: %#v", ErrShardNotFound, cmd) + break + } + shard.Leader = cmd.Shard.Leader + shard.Updated = entry.Index + state.shardPut(shard) + entry.Result.Value = 1 // Delete case command_action_del: shard, ok := state.Shard(cmd.Shard.ID) diff --git a/fsm_cmd.go b/fsm_cmd.go index 08eef5d..1bb4386 100644 --- a/fsm_cmd.go +++ b/fsm_cmd.go @@ -18,6 +18,7 @@ const ( command_action_tags_set = "tags-set" command_action_tags_setnx = "tags-setnx" command_action_tags_remove = "tags-remove" + command_action_leader_set = "leader-set" ) type command struct { @@ -92,6 +93,17 @@ func newCmdShardStatusUpdate(id uint64, status ShardStatus) (b []byte) { return } +func newCmdShardLeaderSet(shardID, replicaID uint64) (b []byte) { + b, _ = json.Marshal(commandShard{command{ + Action: command_action_leader_set, + Type: command_type_shard, + }, Shard{ + ID: shardID, + Leader: replicaID, + }}) + return +} + func newCmdShardDel(shardID uint64) (b []byte) { b, _ = json.Marshal(commandShard{command{ Action: command_action_del, diff --git a/shard_client.go b/shard_client.go index 2974997..e955db7 100644 --- a/shard_client.go +++ b/shard_client.go @@ -16,9 +16,10 @@ type ShardClient interface { // The shard client type client struct { - manager *clientManager - shardID uint64 - retries int + manager *clientManager + shardID uint64 + retries int + writeToLeader bool } func newClient(manager *clientManager, shardID uint64, opts ...ClientOption) (c *client, err error) { @@ -53,6 +54,11 @@ func (c *client) Index(ctx context.Context) (err error) { } func (c *client) Apply(ctx context.Context, cmd []byte) (value uint64, data []byte, err error) { + if c.writeToLeader { + if client, ok := c.manager.clientLeader[c.shardID]; ok { + return client.Apply(ctx, c.shardID, cmd) + } + } c.manager.mutex.RLock() list, ok := c.manager.clientMember[c.shardID] c.manager.mutex.RUnlock() @@ -70,6 +76,11 @@ func (c *client) Apply(ctx context.Context, cmd []byte) (value uint64, data []by } func (c *client) Commit(ctx context.Context, cmd []byte) (err error) { + if c.writeToLeader { + if client, ok := c.manager.clientLeader[c.shardID]; ok { + return client.Commit(ctx, c.shardID, cmd) + } + } c.manager.mutex.RLock() list, ok := c.manager.clientMember[c.shardID] c.manager.mutex.RUnlock() diff --git a/shard_client_manager.go b/shard_client_manager.go index 465dc63..12eaefd 100644 --- a/shard_client_manager.go +++ b/shard_client_manager.go @@ -18,6 +18,7 @@ type clientManager struct { ctx context.Context ctxCancel context.CancelFunc clientHost map[string]hostClient + clientLeader map[uint64]hostClient clientMember map[uint64]*orderedmap.OrderedMap[int64, hostClient] clientReplica map[uint64]*orderedmap.OrderedMap[int64, hostClient] index uint64 @@ -33,6 +34,7 @@ func newClientManager(agent *Agent) *clientManager { agent: agent, clock: clock.New(), clientHost: map[string]hostClient{}, + clientLeader: map[uint64]hostClient{}, clientMember: map[uint64]*orderedmap.OrderedMap[int64, hostClient]{}, clientReplica: map[uint64]*orderedmap.OrderedMap[int64, hostClient]{}, } @@ -76,6 +78,7 @@ func (c *clientManager) tick() { state.ShardIterateUpdatedAfter(c.index, func(shard Shard) bool { shardCount++ index = shard.Updated + var leader hostClient members := []hostClientPing{} replicas := []hostClientPing{} state.ReplicaIterateByShardID(shard.ID, func(replica Replica) bool { @@ -101,6 +104,9 @@ func (c *clientManager) tick() { } else { members = append(members, hostClientPing{ping.Nanoseconds(), client}) } + if shard.Leader == replica.ID { + leader = client + } return true }) slices.SortFunc(members, byPingAsc) @@ -114,6 +120,9 @@ func (c *clientManager) tick() { newReplicas.Set(item.ping, item.client) } c.mutex.Lock() + if leader.agent != nil { + c.clientLeader[shard.ID] = leader + } c.clientMember[shard.ID] = newMembers c.clientReplica[shard.ID] = newReplicas c.mutex.Unlock() @@ -121,7 +130,13 @@ func (c *clientManager) tick() { }) }) if err == nil && shardCount > 0 { - c.log.Infof("%s Shard client manager updated. hosts: %d shards: %d replicas: %d time: %vms", c.agent.hostID(), len(pings), shardCount, replicaCount, float64(c.clock.Since(start)/time.Microsecond)/1000) + c.log.Infof("%s Shard client manager updated. hosts: %d shards: %d replicas: %d leaders: %d time: %vms", + c.agent.hostID(), + len(pings), + shardCount, + replicaCount, + len(c.clientLeader), + float64(c.clock.Since(start)/time.Microsecond)/1000) c.index = index } return diff --git a/shard_client_option.go b/shard_client_option.go index ae6a749..6631de2 100644 --- a/shard_client_option.go +++ b/shard_client_option.go @@ -8,3 +8,10 @@ func WithRetries(retries int) ClientOption { return nil } } + +func WithWriteToLeader() ClientOption { + return func(c *client) error { + c.writeToLeader = true + return nil + } +} diff --git a/shard_controller_manager.go b/shard_controller_manager.go index f4ce9e9..d6cb526 100644 --- a/shard_controller_manager.go +++ b/shard_controller_manager.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/benbjohnson/clock" @@ -11,18 +12,16 @@ import ( // The controllerManager creates and destroys replicas based on a shard tags. type controllerManager struct { - agent *Agent - clock clock.Clock - ctx context.Context - ctxCancel context.CancelFunc - index uint64 - isLeader bool - lastHostID string - leaderIndex uint64 - log Logger - mutex sync.RWMutex - controller Controller - wg sync.WaitGroup + agent *Agent + clock clock.Clock + ctx context.Context + ctxCancel context.CancelFunc + index uint64 + isLeader atomic.Bool + log Logger + mutex sync.RWMutex + controller Controller + wg sync.WaitGroup } func newControllerManager(agent *Agent) *controllerManager { @@ -68,7 +67,7 @@ func (c *controllerManager) tick() { var index uint64 var updated = true var controls = newControls(c.agent) - if c.isLeader { + if c.isLeader.Load() { for updated { updated = false err = c.agent.State(c.ctx, func(state *State) { @@ -117,10 +116,10 @@ func (c *controllerManager) tick() { func (c *controllerManager) LeaderUpdated(info LeaderInfo) { c.log.Infof("[%05d:%05d] LeaderUpdated: %05d", info.ShardID, info.ReplicaID, info.LeaderID) if info.ShardID == 0 { - c.mutex.Lock() - c.isLeader = info.LeaderID == info.ReplicaID - c.mutex.Unlock() - return + c.isLeader.Store(info.LeaderID == info.ReplicaID) + } + if c.index > 0 && c.isLeader.Load() { + c.agent.shardLeaderSet(info.ShardID, info.LeaderID) } } diff --git a/types.go b/types.go index 060efb3..47671ea 100644 --- a/types.go +++ b/types.go @@ -19,8 +19,9 @@ type Shard struct { Status ShardStatus `json:"status"` Tags map[string]string `json:"tags"` - Type string `json:"type"` - Name string `json:"name"` + Type string `json:"type"` + Name string `json:"name"` + Leader uint64 `json:"leader"` } type Replica struct {