Skip to content

Commit

Permalink
[fmt] grpcServer
Browse files Browse the repository at this point in the history
  • Loading branch information
kevburnsjr committed Jan 22, 2025
1 parent 21c10a2 commit 68038ed
Showing 1 changed file with 44 additions and 22 deletions.
66 changes: 44 additions & 22 deletions grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,57 +27,71 @@ func newGrpcServer(listenAddr string, opts ...grpc.ServerOption) *grpcServer {
}
}

func (s *grpcServer) Ping(ctx context.Context, req *internal.PingRequest) (res *internal.PingResponse, err error) {
func (s *grpcServer) Ping(ctx context.Context,
req *internal.PingRequest,
) (resp *internal.PingResponse, err error) {
return &internal.PingResponse{}, nil
}

func (s *grpcServer) Probe(ctx context.Context, req *internal.ProbeRequest) (res *internal.ProbeResponse, err error) {
func (s *grpcServer) Probe(ctx context.Context,
req *internal.ProbeRequest,
) (resp *internal.ProbeResponse, err error) {
// s.agent.log.Debugf(`gRPC Req Probe: %#v`, req)
return &internal.ProbeResponse{
GossipAdvertiseAddress: s.agent.hostConfig.Gossip.AdvertiseAddress,
}, nil
}

func (s *grpcServer) Info(ctx context.Context, req *internal.InfoRequest) (res *internal.InfoResponse, err error) {
func (s *grpcServer) Info(ctx context.Context,
req *internal.InfoRequest,
) (resp *internal.InfoResponse, err error) {
// s.agent.log.Debugf(`gRPC Req Info: %#v`, req)
return &internal.InfoResponse{
HostId: s.agent.hostID(),
ReplicaId: s.agent.replicaConfig.ReplicaID,
}, nil
}

func (s *grpcServer) Members(ctx context.Context, req *internal.MembersRequest) (res *internal.MembersResponse, err error) {
func (s *grpcServer) Members(ctx context.Context,
req *internal.MembersRequest,
) (resp *internal.MembersResponse, err error) {
// s.agent.log.Debugf(`gRPC Req Members: %#v`, req)
return &internal.MembersResponse{
Members: s.agent.members,
}, nil
}

func (s *grpcServer) Join(ctx context.Context, req *internal.JoinRequest) (res *internal.JoinResponse, err error) {
func (s *grpcServer) Join(ctx context.Context,
req *internal.JoinRequest,
) (resp *internal.JoinResponse, err error) {
// s.agent.log.Debugf(`gRPC Req Join: %#v`, req)
res = &internal.JoinResponse{}
resp = &internal.JoinResponse{}
if s.agent.Status() != AgentStatus_Ready {
err = ErrAgentNotReady
return
}
res.Value, err = s.agent.joinPrimeReplica(req.HostId, s.agent.replicaConfig.ShardID, req.IsNonVoting)
resp.Value, err = s.agent.joinPrimeReplica(req.HostId, s.agent.replicaConfig.ShardID, req.IsNonVoting)
return
}

func (s *grpcServer) Add(ctx context.Context, req *internal.AddRequest) (res *internal.AddResponse, err error) {
func (s *grpcServer) Add(ctx context.Context,
req *internal.AddRequest,
) (resp *internal.AddResponse, err error) {
// s.agent.log.Debugf(`gRPC Req Join: %#v`, req)
res = &internal.AddResponse{}
resp = &internal.AddResponse{}
if s.agent.Status() != AgentStatus_Ready {
err = ErrAgentNotReady
return
}
res.Value, err = s.agent.joinShardReplica(req.HostId, req.ShardId, req.ReplicaId, req.IsNonVoting)
resp.Value, err = s.agent.joinShardReplica(req.HostId, req.ShardId, req.ReplicaId, req.IsNonVoting)
return
}

var emptyCommitResponse = &internal.CommitResponse{}

func (s *grpcServer) Commit(ctx context.Context, req *internal.CommitRequest) (res *internal.CommitResponse, err error) {
func (s *grpcServer) Commit(ctx context.Context,
req *internal.CommitRequest,
) (resp *internal.CommitResponse, err error) {
// s.agent.log.Debugf(`gRPC Req Propose: %#v`, req)
if !s.agent.hostConfig.NotifyCommit {
s.agent.log.Warningf(`%v`, ErrNotifyCommitDisabled)
Expand All @@ -95,7 +109,7 @@ func (s *grpcServer) Commit(ctx context.Context, req *internal.CommitRequest) (r
select {
case r := <-rs.ResultC():
if r.Committed() {
res = emptyCommitResponse
resp = emptyCommitResponse
} else if r.Aborted() {
err = ErrAborted
} else if r.Dropped() {
Expand All @@ -114,14 +128,16 @@ func (s *grpcServer) Commit(ctx context.Context, req *internal.CommitRequest) (r
err = ErrTimeout
}
}
if err != nil || res != nil {
if err != nil || resp != nil {
break
}
}
return
}

func (s *grpcServer) Apply(ctx context.Context, req *internal.ApplyRequest) (res *internal.ApplyResponse, err error) {
func (s *grpcServer) Apply(ctx context.Context,
req *internal.ApplyRequest,
) (resp *internal.ApplyResponse, err error) {
// s.agent.log.Debugf(`gRPC Req Propose: %#v`, req)
if s.agent.Status() != AgentStatus_Ready {
err = ErrAgentNotReady
Expand All @@ -136,7 +152,7 @@ func (s *grpcServer) Apply(ctx context.Context, req *internal.ApplyRequest) (res
select {
case r := <-rs.ResultC():
if r.Completed() {
res = &internal.ApplyResponse{
resp = &internal.ApplyResponse{
Value: r.GetResult().Value,
Data: r.GetResult().Data,
}
Expand All @@ -161,23 +177,29 @@ func (s *grpcServer) Apply(ctx context.Context, req *internal.ApplyRequest) (res
err = ErrTimeout
}
}
if err != nil || res != nil {
if err != nil || resp != nil {
break
}
}
return
}

func (s *grpcServer) Index(ctx context.Context, req *internal.IndexRequest) (res *internal.IndexResponse, err error) {
var emptyIndexResponse = &internal.IndexResponse{}

func (s *grpcServer) Index(ctx context.Context,
req *internal.IndexRequest,
) (resp *internal.IndexResponse, err error) {
// s.agent.log.Debugf(`gRPC Req Query: %#v`, req)
res = &internal.IndexResponse{}
resp = emptyIndexResponse
err = s.agent.index(ctx, req.ShardId)
return
}

func (s *grpcServer) Read(ctx context.Context, req *internal.ReadRequest) (res *internal.ReadResponse, err error) {
func (s *grpcServer) Read(ctx context.Context,
req *internal.ReadRequest,
) (resp *internal.ReadResponse, err error) {
// s.agent.log.Debugf(`gRPC Req Query: %#v`, req)
res = &internal.ReadResponse{}
resp = &internal.ReadResponse{}
query := getLookupQuery()
query.ctx = ctx
query.data = req.Data
Expand All @@ -191,8 +213,8 @@ func (s *grpcServer) Read(ctx context.Context, req *internal.ReadRequest) (res *
r, err = s.agent.host.SyncRead(ctx, req.ShardId, query)
}
if result, ok := r.(*Result); ok && result != nil {
res.Value = result.Value
res.Data = result.Data
resp.Value = result.Value
resp.Data = result.Data
// Result cannot be released because ReadResponse may not be serialized
// This occurs as an optimization in hostClient for requests that do not require forwarding
// ReleaseResult(result)
Expand Down

0 comments on commit 68038ed

Please sign in to comment.