Skip to content

Commit

Permalink
progress
Browse files Browse the repository at this point in the history
  • Loading branch information
kevburnsjr committed Nov 9, 2024
1 parent 42f1d1e commit 69ae19e
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 134 deletions.
2 changes: 1 addition & 1 deletion _example/kv1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func main() {
if err != nil {
panic(err)
}
clients[i] = agent.GetClient(shard.ID)
clients[i] = agent.Client(shard.ID)
}
// Start HTTP API
go func(s *http.Server) {
Expand Down
224 changes: 112 additions & 112 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,93 @@ func NewAgent(clusterName string, peers []string, opts ...AgentOption) (a *Agent
return a, nil
}

// Client returns a client for a specific shard.
// It will send writes to the nearest member and send reads to the nearest replica (by ping).
func (a *Agent) Client(shardID uint64) (c ShardClient) {
c, _ = newClient(a.clientManager, shardID)
return
}

// ShardCreate creates a new shard. If shard name option is provided and shard exists, found shard is returned.
func (a *Agent) ShardCreate(ctx context.Context, uri string, opts ...ShardOption) (shard Shard, created bool, err error) {
shard = Shard{
Status: ShardStatus_New,
Type: uri,
Tags: map[string]string{},
}
for _, opt := range opts {
if err = opt(&shard); err != nil {
return
}
}
if len(shard.Name) > 0 {
var ok bool
var found Shard
a.State(ctx, func(state *State) {
found, ok = state.ShardFindByName(shard.Name)
})
if ok {
shard = found
return
}
}
res, err := a.primePropose(newCmdShardPost(shard))
if err != nil {
return
}
shard.ID = res.Value
created = true
a.log.Infof("Shard created %s, %d, %s", uri, shard.ID, shard.Name)
return
}

// ShardDelete deletes a shard.
func (a *Agent) ShardDelete(ctx context.Context, id uint64) (err error) {
res, err := a.primePropose(newCmdShardDel(id))
if err != nil {
return
}
if res.Value == 0 {
err = fmt.Errorf("Error deleting shard %d: %s", id, string(res.Data))
return
}
a.log.Infof("Shard deleted (%d)", id)
return
}

// ShardFind returns a shard by id.
func (a *Agent) ShardFind(ctx context.Context, id uint64) (shard Shard, err error) {
err = a.State(ctx, func(state *State) {
shard, _ = state.Shard(id)
})
return
}

// ShardUpdate creates a new shard. If shard name option is provided and shard exists, found shard is returned.
func (a *Agent) ShardUpdate(ctx context.Context, id uint64, opts ...ShardOption) (shard Shard, err error) {
var found Shard
var ok bool
a.State(ctx, func(state *State) {
found, ok = state.Shard(id)
})
if !ok {
err = ErrShardNotFound
return
}
for _, opt := range opts {
opt(&found)
}
shard = found
res, err := a.primePropose(newCmdShardPut(shard))
if err != nil {
return
}
shard.ID = res.Value
a.log.Infof("Shard updated (%d)", id)
return
}

// Start starts the agent, bootstrapping the cluster if required.
func (a *Agent) Start(ctx context.Context) (err error) {
var init bool
defer func() {
Expand Down Expand Up @@ -212,31 +299,6 @@ func (a *Agent) Start(ctx context.Context) (err error) {
return
}

// hostClient returns a Client for a specific host.
func (a *Agent) hostClient(hostID string) (c hostClient) {
a.State(a.ctx, func(s *State) {
host, ok := s.Host(hostID)
if ok {
c = newhostClient(a, host)
}
})
return
}

// GetClient returns a client for a specific shard.
// It will send writes to the nearest member and send reads to the nearest replica (by ping).
func (a *Agent) GetClient(shardID uint64) (c ShardClient) {
c, _ = newClient(a.clientManager, shardID)
return
}

// Status returns the agent status
func (a *Agent) Status() AgentStatus {
a.mutex.RLock()
defer a.mutex.RUnlock()
return a.status
}

// State executes a callback function passing a snapshot of the cluster state.
//
// err := agent.State(ctx, func(s *State) error {
Expand Down Expand Up @@ -265,85 +327,6 @@ func (a *Agent) State(ctx context.Context, fn func(*State)) (err error) {
return
}

// ShardCreate creates a new shard. If shard name option is provided and shard exists, found shard is returned.
func (a *Agent) ShardCreate(ctx context.Context, uri string, opts ...ShardOption) (shard Shard, created bool, err error) {
shard = Shard{
Status: ShardStatus_New,
Type: uri,
Tags: map[string]string{},
}
for _, opt := range opts {
if err = opt(&shard); err != nil {
return
}
}
if len(shard.Name) > 0 {
var ok bool
var found Shard
a.State(ctx, func(state *State) {
found, ok = state.ShardFindByName(shard.Name)
})
if ok {
shard = found
return
}
}
res, err := a.primePropose(newCmdShardPost(shard))
if err != nil {
return
}
shard.ID = res.Value
created = true
a.log.Infof("Shard created %s, %d, %s", uri, shard.ID, shard.Name)
return
}

// ShardDelete deletes a shard.
func (a *Agent) ShardDelete(ctx context.Context, id uint64) (err error) {
res, err := a.primePropose(newCmdShardDel(id))
if err != nil {
return
}
if res.Value == 0 {
err = fmt.Errorf("Error deleting shard %d: %s", id, string(res.Data))
return
}
a.log.Infof("Shard deleted (%d)", id)
return
}

// ShardFind returns a shard by id.
func (a *Agent) ShardFind(ctx context.Context, id uint64) (shard Shard, err error) {
err = a.State(ctx, func(state *State) {
shard, _ = state.Shard(id)
})
return
}

// ShardUpdate creates a new shard. If shard name option is provided and shard exists, found shard is returned.
func (a *Agent) ShardUpdate(ctx context.Context, id uint64, opts ...ShardOption) (shard Shard, err error) {
var found Shard
var ok bool
a.State(ctx, func(state *State) {
found, ok = state.ShardFindByName(shard.Name)
})
if !ok {
err = ErrShardNotFound
return
}
for _, opt := range opts {
opt(&found)
}
shard = found
res, err := a.primePropose(newCmdShardPut(shard))
if err != nil {
return
}
shard.ID = res.Value
a.log.Infof("Shard updated (%d)", id)
return
}

// StateMachineRegister registers a non-persistent shard type. Call before Starting agent.
func (a *Agent) StateMachineRegister(uri string, factory any, config ...ReplicaConfig) (err error) {
cfg := DefaultReplicaConfig
Expand All @@ -366,15 +349,14 @@ func (a *Agent) StateMachineRegister(uri string, factory any, config ...ReplicaC
return
}

// hFostID returns host ID if host is initialized, otherwise empty string.
func (a *Agent) hostID() (id string) {
if a.host != nil {
return a.host.ID()
}
return ""
// Status returns the agent status.
func (a *Agent) Status() AgentStatus {
a.mutex.RLock()
defer a.mutex.RUnlock()
return a.status
}

// Stop stops the agent
// Stop stops the agent.
func (a *Agent) Stop() {
a.controllerManager.Stop()
a.hostController.Stop()
Expand All @@ -387,6 +369,25 @@ func (a *Agent) Stop() {
a.setStatus(AgentStatus_Stopped)
}

// hostID returns host ID if host is initialized, otherwise empty string.
func (a *Agent) hostID() (id string) {
if a.host != nil {
return a.host.ID()
}
return ""
}

// hostClient returns a Client for a specific host.
func (a *Agent) hostClient(hostID string) (c hostClient) {
a.State(a.ctx, func(s *State) {
host, ok := s.Host(hostID)
if ok {
c = newhostClient(a, host)
}
})
return
}

// tagsSet sets tags on an item (Host, Shard or Replica). Overwrites if tag is already present.
func (a *Agent) tagsSet(item any, tags ...string) (err error) {
_, err = a.primePropose(newCmdTagsSet(item, tags...))
Expand Down Expand Up @@ -740,7 +741,6 @@ func (a *Agent) primeInit(members map[uint64]string) (err error) {
return
}
}

return
}

Expand Down
2 changes: 1 addition & 1 deletion agent_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func runAgentSubTestByShard(t *testing.T, agents []*Agent, shard Shard, op strin
var val uint64
for _, a := range agents {
val = 0
client := a.GetClient(shard.ID)
client := a.Client(shard.ID)
require.NotNil(t, client)
if op == "update" && stale {
err = client.Commit(raftCtx(), bytes.Repeat([]byte("test"), i+1))
Expand Down
Loading

0 comments on commit 69ae19e

Please sign in to comment.