Skip to content

Commit

Permalink
Track shard term and make available to client
Browse files Browse the repository at this point in the history
  • Loading branch information
kevburnsjr committed Jan 6, 2025
1 parent 0befbdf commit 7f8db32
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 23 deletions.
4 changes: 2 additions & 2 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,8 @@ func (a *Agent) replicaDelete(replicaID uint64) (err error) {
}

// shardLeaderSet sets the leader of a shard
func (a *Agent) shardLeaderSet(shardID, replicaID uint64) (err error) {
_, err = a.primePropose(newCmdShardLeaderSet(shardID, replicaID))
func (a *Agent) shardLeaderSet(shardID, replicaID, term uint64) (err error) {
_, err = a.primePropose(newCmdShardLeaderSet(shardID, replicaID, term))
if err == nil {
a.log.Infof("Shard %05d leader set to %05d", shardID, replicaID)
}
Expand Down
1 change: 1 addition & 0 deletions fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func (fsm *fsm) Update(entry Entry) (Result, error) {
break
}
shard.Leader = cmd.Shard.Leader
shard.Term = cmd.Shard.Term
shard.Updated = entry.Index
state.shardPut(shard)
entry.Result.Value = 1
Expand Down
3 changes: 2 additions & 1 deletion fsm_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,14 @@ func newCmdShardStatusUpdate(id uint64, status ShardStatus) (b []byte) {
return
}

func newCmdShardLeaderSet(shardID, replicaID uint64) (b []byte) {
func newCmdShardLeaderSet(shardID, replicaID, term uint64) (b []byte) {
b, _ = json.Marshal(commandShard{command{
Action: command_action_leader_set,
Type: command_type_shard,
}, Shard{
ID: shardID,
Leader: replicaID,
Term: term,
}})
return
}
Expand Down
36 changes: 27 additions & 9 deletions shard_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
// ShardClient can be used to interact with a shard regardless of its placement in the cluster
// Requests will be forwarded to the appropriate host based on ping
type ShardClient interface {
Index(ctx context.Context) (err error)
Apply(ctx context.Context, cmd []byte) (value uint64, data []byte, err error)
Commit(ctx context.Context, cmd []byte) (err error)
Index(ctx context.Context) (err error)
Leader() (uint64, uint64)
Read(ctx context.Context, query []byte, stale bool) (value uint64, data []byte, err error)
Watch(ctx context.Context, query []byte, results chan<- *Result, stale bool) (err error)
}
Expand Down Expand Up @@ -53,10 +54,24 @@ func (c *client) Index(ctx context.Context) (err error) {
return
}

func (c *client) Leader() (replicaID, term uint64) {
c.manager.mutex.RLock()
leader, ok := c.manager.clientLeader[c.shardID]
c.manager.mutex.RUnlock()
if ok {
replicaID = leader.replicaID
term = leader.term
}
return
}

func (c *client) Apply(ctx context.Context, cmd []byte) (value uint64, data []byte, err error) {
if c.writeToLeader {
if client, ok := c.manager.clientLeader[c.shardID]; ok {
return client.Apply(ctx, c.shardID, cmd)
c.manager.mutex.RLock()
leader, ok := c.manager.clientLeader[c.shardID]
c.manager.mutex.RUnlock()
if ok {
return leader.client.Apply(ctx, c.shardID, cmd)
}
}
c.manager.mutex.RLock()
Expand All @@ -77,8 +92,11 @@ func (c *client) Apply(ctx context.Context, cmd []byte) (value uint64, data []by

func (c *client) Commit(ctx context.Context, cmd []byte) (err error) {
if c.writeToLeader {
if client, ok := c.manager.clientLeader[c.shardID]; ok {
return client.Commit(ctx, c.shardID, cmd)
c.manager.mutex.RLock()
leader, ok := c.manager.clientLeader[c.shardID]
c.manager.mutex.RUnlock()
if ok {
return leader.client.Commit(ctx, c.shardID, cmd)
}
}
c.manager.mutex.RLock()
Expand Down Expand Up @@ -121,8 +139,8 @@ func (c *client) Read(ctx context.Context, query []byte, stale bool) (value uint
}
}
if c.writeToLeader {
if client, ok := c.manager.clientLeader[c.shardID]; ok {
return client.Read(ctx, c.shardID, query, stale)
if leader, ok := c.manager.clientLeader[c.shardID]; ok {
return leader.client.Read(ctx, c.shardID, query, stale)
}
}
c.manager.mutex.RLock()
Expand Down Expand Up @@ -165,8 +183,8 @@ func (c *client) Watch(ctx context.Context, query []byte, results chan<- *Result
}
}
if c.writeToLeader {
if client, ok := c.manager.clientLeader[c.shardID]; ok {
return client.Watch(ctx, c.shardID, query, results, stale)
if leader, ok := c.manager.clientLeader[c.shardID]; ok {
return leader.client.Watch(ctx, c.shardID, query, results, stale)
}
}
c.manager.mutex.RLock()
Expand Down
20 changes: 14 additions & 6 deletions shard_client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type clientManager struct {
ctx context.Context
ctxCancel context.CancelFunc
clientHost map[string]hostClient
clientLeader map[uint64]hostClient
clientLeader map[uint64]hostClientLeader
clientMember map[uint64]*orderedmap.OrderedMap[int64, hostClient]
clientReplica map[uint64]*orderedmap.OrderedMap[int64, hostClient]
index uint64
Expand All @@ -28,13 +28,19 @@ type clientManager struct {
wg sync.WaitGroup
}

type hostClientLeader struct {
client hostClient
replicaID uint64
term uint64
}

func newClientManager(agent *Agent) *clientManager {
return &clientManager{
log: agent.log,
agent: agent,
clock: clock.New(),
clientHost: map[string]hostClient{},
clientLeader: map[uint64]hostClient{},
clientLeader: map[uint64]hostClientLeader{},
clientMember: map[uint64]*orderedmap.OrderedMap[int64, hostClient]{},
clientReplica: map[uint64]*orderedmap.OrderedMap[int64, hostClient]{},
}
Expand Down Expand Up @@ -78,7 +84,7 @@ func (c *clientManager) tick() {
state.ShardIterateUpdatedAfter(c.index, func(shard Shard) bool {
shardCount++
index = shard.Updated
var leader hostClient
var leaderClient hostClient
members := []hostClientPing{}
replicas := []hostClientPing{}
state.ReplicaIterateByShardID(shard.ID, func(replica Replica) bool {
Expand All @@ -105,7 +111,7 @@ func (c *clientManager) tick() {
members = append(members, hostClientPing{ping.Nanoseconds(), client})
}
if shard.Leader == replica.ID {
leader = client
leaderClient = client
}
return true
})
Expand All @@ -120,8 +126,10 @@ func (c *clientManager) tick() {
newReplicas.Set(item.ping, item.client)
}
c.mutex.Lock()
if leader.agent != nil {
c.clientLeader[shard.ID] = leader
if leaderClient.agent != nil {
c.clientLeader[shard.ID] = hostClientLeader{leaderClient, shard.Leader, shard.Term}
} else {
delete(c.clientLeader, shard.ID)
}
c.clientMember[shard.ID] = newMembers
c.clientReplica[shard.ID] = newReplicas
Expand Down
6 changes: 3 additions & 3 deletions shard_controller_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ func (c *controllerManager) tick() {
}

func (c *controllerManager) LeaderUpdated(info LeaderInfo) {
c.log.Infof("[%05d:%05d] LeaderUpdated: %05d", info.ShardID, info.ReplicaID, info.LeaderID)
c.log.Infof("[%05d:%05d] LeaderUpdated: %05d (term %d)", info.ShardID, info.ReplicaID, info.LeaderID, info.Term)
if info.ShardID == 0 {
c.isLeader.Store(info.LeaderID == info.ReplicaID)
}
if c.index > 0 && c.isLeader.Load() {
c.agent.shardLeaderSet(info.ShardID, info.LeaderID)
if c.isLeader.Load() {
c.agent.shardLeaderSet(info.ShardID, info.LeaderID, info.Term)
}
}

Expand Down
5 changes: 3 additions & 2 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ type Shard struct {
Status ShardStatus `json:"status"`
Tags map[string]string `json:"tags"`

Type string `json:"type"`
Name string `json:"name"`
Leader uint64 `json:"leader"`
Name string `json:"name"`
Term uint64 `json:"term"`
Type string `json:"type"`
}

type Replica struct {
Expand Down

0 comments on commit 7f8db32

Please sign in to comment.