Skip to content

Commit

Permalink
Using variadicity for one optional argument feels goofy
Browse files Browse the repository at this point in the history
  • Loading branch information
kevburnsjr committed Mar 11, 2024
1 parent 6cf2aad commit ff651dd
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 34 deletions.
6 changes: 3 additions & 3 deletions _example/kv1-fast/state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ func (q *kvQuery) MustMarshalBinary() []byte {
}

type kvCmd struct {
Op uint64 `json:"op"`
Key string `json:"key"`
Record kvRecord
Op uint64 `json:"op"`
Key string `json:"key"`
Record kvRecord `json:"rec"`
}

func (c *kvCmd) MustMarshalBinary() []byte {
Expand Down
6 changes: 3 additions & 3 deletions _example/kv1/state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ func (q *kvQuery) MustMarshalBinary() []byte {
}

type kvCmd struct {
Op uint64 `json:"op"`
Key string `json:"key"`
Record kvRecord
Op uint64 `json:"op"`
Key string `json:"key"`
Record kvRecord `json:"rec"`
}

func (c *kvCmd) MustMarshalBinary() []byte {
Expand Down
14 changes: 6 additions & 8 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,10 @@ func (a *Agent) Status() AgentStatus {
// Pass a timeout context to avoid blocking indefinitely.
//
// Read is thread safe and will not block writes.
func (a *Agent) Read(ctx context.Context, fn func(*State), stale ...bool) (err error) {
if len(stale) == 0 || stale[0] == false {
err = a.readIndex(ctx, a.replicaConfig.ShardID)
if err != nil {
return
}
func (a *Agent) Read(ctx context.Context, fn func(*State)) (err error) {
err = a.readIndex(ctx, a.replicaConfig.ShardID)
if err != nil {
return
}
fn(a.fsm.state.withTxn(false))
return
Expand Down Expand Up @@ -778,10 +776,10 @@ func (a *Agent) findLocalReplicaID(shardID uint64) (id uint64) {
}

func (a *Agent) dumpState() {
a.Read(a.ctx, func(state *State) {
a.ReadStale(func(state *State) {
// Print snapshot
buf := bytes.NewBufferString("")
state.Save(buf)
a.log.Debugf(buf.String())
}, true)
})
}
16 changes: 8 additions & 8 deletions host_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ type HostClient interface {
Ping(ctx context.Context) (t time.Duration, err error)
Apply(ctx context.Context, shardID uint64, cmd []byte) (value uint64, data []byte, err error)
Commit(ctx context.Context, shardID uint64, cmd []byte) (err error)
Read(ctx context.Context, shardID uint64, query []byte, stale ...bool) (value uint64, data []byte, err error)
Watch(ctx context.Context, shardID uint64, query []byte, results chan<- *Result, stale ...bool) (err error)
Read(ctx context.Context, shardID uint64, query []byte, stale bool) (value uint64, data []byte, err error)
Watch(ctx context.Context, shardID uint64, query []byte, results chan<- *Result, stale bool) (err error)
}

type hostclient struct {
Expand Down Expand Up @@ -87,18 +87,18 @@ func (c *hostclient) Commit(ctx context.Context, shardID uint64, cmd []byte) (er
return
}

func (c *hostclient) Read(ctx context.Context, shardID uint64, query []byte, stale ...bool) (value uint64, data []byte, err error) {
func (c *hostclient) Read(ctx context.Context, shardID uint64, query []byte, stale bool) (value uint64, data []byte, err error) {
var res *internal.ReadResponse
if c.hostID == c.agent.HostID() {
res, err = c.agent.grpcServer.Read(ctx, &internal.ReadRequest{
ShardId: shardID,
Stale: len(stale) > 0 && stale[0],
Stale: stale,
Data: query,
})
} else {
res, err = c.agent.grpcClientPool.get(c.hostApiAddress).Read(ctx, &internal.ReadRequest{
ShardId: shardID,
Stale: len(stale) > 0 && stale[0],
Stale: stale,
Data: query,
})
}
Expand Down Expand Up @@ -136,18 +136,18 @@ func (s *watchServer) Send(res *internal.WatchResponse) error {
return nil
}

func (c *hostclient) Watch(ctx context.Context, shardID uint64, query []byte, results chan<- *Result, stale ...bool) (err error) {
func (c *hostclient) Watch(ctx context.Context, shardID uint64, query []byte, results chan<- *Result, stale bool) (err error) {
var client internal.Zongzi_WatchClient
if c.hostID == c.agent.HostID() {
err = c.agent.grpcServer.Watch(&internal.WatchRequest{
ShardId: shardID,
Stale: len(stale) > 0 && stale[0],
Stale: stale,
Data: query,
}, newWatchServer(ctx, results))
} else {
client, err = c.agent.grpcClientPool.get(c.hostApiAddress).Watch(ctx, &internal.WatchRequest{
ShardId: shardID,
Stale: len(stale) > 0 && stale[0],
Stale: stale,
Data: query,
})
for {
Expand Down
20 changes: 10 additions & 10 deletions shard_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
type ShardClient interface {
Apply(ctx context.Context, cmd []byte) (value uint64, data []byte, err error)
Commit(ctx context.Context, cmd []byte) (err error)
Read(ctx context.Context, query []byte, stale ...bool) (value uint64, data []byte, err error)
Watch(ctx context.Context, query []byte, results chan<- *Result, stale ...bool) (err error)
Read(ctx context.Context, query []byte, stale bool) (value uint64, data []byte, err error)
Watch(ctx context.Context, query []byte, results chan<- *Result, stale bool) (err error)
}

// The shardClient
Expand Down Expand Up @@ -59,15 +59,15 @@ func (c *shardClient) Commit(ctx context.Context, cmd []byte) (err error) {
return
}

func (c *shardClient) Read(ctx context.Context, query []byte, stale ...bool) (value uint64, data []byte, err error) {
func (c *shardClient) Read(ctx context.Context, query []byte, stale bool) (value uint64, data []byte, err error) {
var run bool
if len(stale) > 0 && stale[0] {
if stale {
c.manager.mutex.RLock()
el := c.manager.clientReplica[c.shardID].Front()
c.manager.mutex.RUnlock()
for ; el != nil; el = el.Next() {
run = true
value, data, err = el.Value.Read(ctx, c.shardID, query)
value, data, err = el.Value.Read(ctx, c.shardID, query, stale)
if err == nil {
break
}
Expand All @@ -80,23 +80,23 @@ func (c *shardClient) Read(ctx context.Context, query []byte, stale ...bool) (va
el := c.manager.clientMember[c.shardID].Front()
c.manager.mutex.RUnlock()
for ; el != nil; el = el.Next() {
value, data, err = el.Value.Read(ctx, c.shardID, query)
value, data, err = el.Value.Read(ctx, c.shardID, query, stale)
if err == nil {
break
}
}
return
}

func (c *shardClient) Watch(ctx context.Context, query []byte, results chan<- *Result, stale ...bool) (err error) {
func (c *shardClient) Watch(ctx context.Context, query []byte, results chan<- *Result, stale bool) (err error) {
var run bool
if len(stale) > 0 && stale[0] {
if stale {
c.manager.mutex.RLock()
el := c.manager.clientReplica[c.shardID].Front()
c.manager.mutex.RUnlock()
for ; el != nil; el = el.Next() {
run = true
err = el.Value.Watch(ctx, c.shardID, query, results, stale...)
err = el.Value.Watch(ctx, c.shardID, query, results, stale)
if err == nil {
break
}
Expand All @@ -109,7 +109,7 @@ func (c *shardClient) Watch(ctx context.Context, query []byte, results chan<- *R
el := c.manager.clientMember[c.shardID].Front()
c.manager.mutex.RUnlock()
for ; el != nil; el = el.Next() {
err = el.Value.Watch(ctx, c.shardID, query, results, stale...)
err = el.Value.Watch(ctx, c.shardID, query, results, stale)
if err == nil {
break
}
Expand Down
4 changes: 2 additions & 2 deletions shard_client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *shardClientManager) tick() {
var shardCount int
var replicaCount int
var pings = map[string]time.Duration{}
err = c.agent.Read(c.ctx, func(state *State) {
err = c.agent.ReadStale(func(state *State) {
state.ShardIterateUpdatedAfter(c.index, func(shard Shard) bool {
shardCount++
index = shard.Updated
Expand Down Expand Up @@ -119,7 +119,7 @@ func (c *shardClientManager) tick() {
c.mutex.Unlock()
return true
})
}, true)
})
if err == nil && shardCount > 0 {
c.log.Infof("%s Shard client manager updated. hosts: %d shards: %d replicas: %d time: %vms", c.agent.HostID(), len(pings), shardCount, replicaCount, int(c.clock.Since(start)/time.Millisecond))
c.index = index
Expand Down

0 comments on commit ff651dd

Please sign in to comment.