Skip to content

Commit

Permalink
Shard Client option WriteToLeader
Browse files Browse the repository at this point in the history
  • Loading branch information
kevburnsjr committed Jan 5, 2025
1 parent 4781a6c commit c28afcc
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 32 deletions.
13 changes: 11 additions & 2 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func NewAgent(clusterName string, peers []string, opts ...AgentOption) (a *Agent

// Client returns a client for a specific shard.
// It will send writes to the nearest member and send reads to the nearest replica (by ping).
func (a *Agent) Client(shardID uint64) (c ShardClient) {
c, _ = newClient(a.clientManager, shardID)
func (a *Agent) Client(shardID uint64, opts ...ClientOption) (c ShardClient) {
c, _ = newClient(a.clientManager, shardID, opts...)
return
}

Expand Down Expand Up @@ -425,6 +425,15 @@ func (a *Agent) replicaDelete(replicaID uint64) (err error) {
return
}

// shardLeaderSet sets the leader of a shard
func (a *Agent) shardLeaderSet(shardID, replicaID uint64) (err error) {
_, err = a.primePropose(newCmdShardLeaderSet(shardID, replicaID))
if err == nil {
a.log.Infof("Shard %05d leader set to %05d", shardID, replicaID)
}
return
}

func (a *Agent) setStatus(s AgentStatus) {
a.mutex.Lock()
defer a.mutex.Unlock()
Expand Down
19 changes: 12 additions & 7 deletions agent_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,14 @@ func TestAgent(t *testing.T) {
}
for _, op := range []string{"update", "query", "watch"} {
for _, linearity := range []string{"linear", "non-linear"} {
t.Run(fmt.Sprintf(`%s %s %s host client`, sm, op, linearity), func(t *testing.T) {
runAgentSubTest(t, agents, shard, op, linearity != "linear")
})
t.Run(fmt.Sprintf(`%s %s %s shard client`, sm, op, linearity), func(t *testing.T) {
runAgentSubTestByShard(t, agents, shard, op, linearity != "linear")
})
for _, writeTarget := range []string{"all", "leader"} {
t.Run(fmt.Sprintf(`%s %s %s host client`, sm, op, linearity), func(t *testing.T) {
runAgentSubTest(t, agents, shard, op, linearity != "linear")
})
t.Run(fmt.Sprintf(`%s %s %s shard client`, sm, op, linearity), func(t *testing.T) {
runAgentSubTestByShard(t, agents, shard, op, linearity != "linear", writeTarget == "leader")
})
}
}
}
}
Expand Down Expand Up @@ -338,13 +340,16 @@ func runAgentSubTest(t *testing.T, agents []*Agent, shard Shard, op string, stal
}
}

func runAgentSubTestByShard(t *testing.T, agents []*Agent, shard Shard, op string, stale bool) {
func runAgentSubTestByShard(t *testing.T, agents []*Agent, shard Shard, op string, stale, writeToleader bool) {
var i = 0
var err error
var val uint64
for _, a := range agents {
val = 0
client := a.Client(shard.ID)
if writeToleader {
client = a.Client(shard.ID, WithWriteToLeader())
}
require.NotNil(t, client)
if op == "update" && stale {
err = client.Commit(raftCtx(), bytes.Repeat([]byte("test"), i+1))
Expand Down
11 changes: 11 additions & 0 deletions fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,17 @@ func (fsm *fsm) Update(entry Entry) (Result, error) {
return true
})
entry.Result.Value = 1
// Set Leader
case command_action_leader_set:
shard, ok := state.Shard(cmd.Shard.ID)
if !ok {
fsm.log.Warningf("%v: %#v", ErrShardNotFound, cmd)
break
}
shard.Leader = cmd.Shard.Leader
shard.Updated = entry.Index
state.shardPut(shard)
entry.Result.Value = 1
// Delete
case command_action_del:
shard, ok := state.Shard(cmd.Shard.ID)
Expand Down
12 changes: 12 additions & 0 deletions fsm_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
command_action_tags_set = "tags-set"
command_action_tags_setnx = "tags-setnx"
command_action_tags_remove = "tags-remove"
command_action_leader_set = "leader-set"
)

type command struct {
Expand Down Expand Up @@ -92,6 +93,17 @@ func newCmdShardStatusUpdate(id uint64, status ShardStatus) (b []byte) {
return
}

func newCmdShardLeaderSet(shardID, replicaID uint64) (b []byte) {
b, _ = json.Marshal(commandShard{command{
Action: command_action_leader_set,
Type: command_type_shard,
}, Shard{
ID: shardID,
Leader: replicaID,
}})
return
}

func newCmdShardDel(shardID uint64) (b []byte) {
b, _ = json.Marshal(commandShard{command{
Action: command_action_del,
Expand Down
17 changes: 14 additions & 3 deletions shard_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ type ShardClient interface {

// The shard client
type client struct {
manager *clientManager
shardID uint64
retries int
manager *clientManager
shardID uint64
retries int
writeToLeader bool
}

func newClient(manager *clientManager, shardID uint64, opts ...ClientOption) (c *client, err error) {
Expand Down Expand Up @@ -53,6 +54,11 @@ func (c *client) Index(ctx context.Context) (err error) {
}

func (c *client) Apply(ctx context.Context, cmd []byte) (value uint64, data []byte, err error) {
if c.writeToLeader {
if client, ok := c.manager.clientLeader[c.shardID]; ok {
return client.Apply(ctx, c.shardID, cmd)
}
}
c.manager.mutex.RLock()
list, ok := c.manager.clientMember[c.shardID]
c.manager.mutex.RUnlock()
Expand All @@ -70,6 +76,11 @@ func (c *client) Apply(ctx context.Context, cmd []byte) (value uint64, data []by
}

func (c *client) Commit(ctx context.Context, cmd []byte) (err error) {
if c.writeToLeader {
if client, ok := c.manager.clientLeader[c.shardID]; ok {
return client.Commit(ctx, c.shardID, cmd)
}
}
c.manager.mutex.RLock()
list, ok := c.manager.clientMember[c.shardID]
c.manager.mutex.RUnlock()
Expand Down
17 changes: 16 additions & 1 deletion shard_client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type clientManager struct {
ctx context.Context
ctxCancel context.CancelFunc
clientHost map[string]hostClient
clientLeader map[uint64]hostClient
clientMember map[uint64]*orderedmap.OrderedMap[int64, hostClient]
clientReplica map[uint64]*orderedmap.OrderedMap[int64, hostClient]
index uint64
Expand All @@ -33,6 +34,7 @@ func newClientManager(agent *Agent) *clientManager {
agent: agent,
clock: clock.New(),
clientHost: map[string]hostClient{},
clientLeader: map[uint64]hostClient{},
clientMember: map[uint64]*orderedmap.OrderedMap[int64, hostClient]{},
clientReplica: map[uint64]*orderedmap.OrderedMap[int64, hostClient]{},
}
Expand Down Expand Up @@ -76,6 +78,7 @@ func (c *clientManager) tick() {
state.ShardIterateUpdatedAfter(c.index, func(shard Shard) bool {
shardCount++
index = shard.Updated
var leader hostClient
members := []hostClientPing{}
replicas := []hostClientPing{}
state.ReplicaIterateByShardID(shard.ID, func(replica Replica) bool {
Expand All @@ -101,6 +104,9 @@ func (c *clientManager) tick() {
} else {
members = append(members, hostClientPing{ping.Nanoseconds(), client})
}
if shard.Leader == replica.ID {
leader = client
}
return true
})
slices.SortFunc(members, byPingAsc)
Expand All @@ -114,14 +120,23 @@ func (c *clientManager) tick() {
newReplicas.Set(item.ping, item.client)
}
c.mutex.Lock()
if leader.agent != nil {
c.clientLeader[shard.ID] = leader
}
c.clientMember[shard.ID] = newMembers
c.clientReplica[shard.ID] = newReplicas
c.mutex.Unlock()
return true
})
})
if err == nil && shardCount > 0 {
c.log.Infof("%s Shard client manager updated. hosts: %d shards: %d replicas: %d time: %vms", c.agent.hostID(), len(pings), shardCount, replicaCount, float64(c.clock.Since(start)/time.Microsecond)/1000)
c.log.Infof("%s Shard client manager updated. hosts: %d shards: %d replicas: %d leaders: %d time: %vms",
c.agent.hostID(),
len(pings),
shardCount,
replicaCount,
len(c.clientLeader),
float64(c.clock.Since(start)/time.Microsecond)/1000)
c.index = index
}
return
Expand Down
7 changes: 7 additions & 0 deletions shard_client_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,10 @@ func WithRetries(retries int) ClientOption {
return nil
}
}

func WithWriteToLeader() ClientOption {
return func(c *client) error {
c.writeToLeader = true
return nil
}
}
33 changes: 16 additions & 17 deletions shard_controller_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,24 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/benbjohnson/clock"
)

// The controllerManager creates and destroys replicas based on a shard tags.
type controllerManager struct {
agent *Agent
clock clock.Clock
ctx context.Context
ctxCancel context.CancelFunc
index uint64
isLeader bool
lastHostID string
leaderIndex uint64
log Logger
mutex sync.RWMutex
controller Controller
wg sync.WaitGroup
agent *Agent
clock clock.Clock
ctx context.Context
ctxCancel context.CancelFunc
index uint64
isLeader atomic.Bool
log Logger
mutex sync.RWMutex
controller Controller
wg sync.WaitGroup
}

func newControllerManager(agent *Agent) *controllerManager {
Expand Down Expand Up @@ -68,7 +67,7 @@ func (c *controllerManager) tick() {
var index uint64
var updated = true
var controls = newControls(c.agent)
if c.isLeader {
if c.isLeader.Load() {
for updated {
updated = false
err = c.agent.State(c.ctx, func(state *State) {
Expand Down Expand Up @@ -117,10 +116,10 @@ func (c *controllerManager) tick() {
func (c *controllerManager) LeaderUpdated(info LeaderInfo) {
c.log.Infof("[%05d:%05d] LeaderUpdated: %05d", info.ShardID, info.ReplicaID, info.LeaderID)
if info.ShardID == 0 {
c.mutex.Lock()
c.isLeader = info.LeaderID == info.ReplicaID
c.mutex.Unlock()
return
c.isLeader.Store(info.LeaderID == info.ReplicaID)
}
if c.index > 0 && c.isLeader.Load() {
c.agent.shardLeaderSet(info.ShardID, info.LeaderID)
}
}

Expand Down
5 changes: 3 additions & 2 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ type Shard struct {
Status ShardStatus `json:"status"`
Tags map[string]string `json:"tags"`

Type string `json:"type"`
Name string `json:"name"`
Type string `json:"type"`
Name string `json:"name"`
Leader uint64 `json:"leader"`
}

type Replica struct {
Expand Down

0 comments on commit c28afcc

Please sign in to comment.