Skip to content

Commit

Permalink
fix(registry/consul): fix concurrency issues and improve performance (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lftk authored Jan 24, 2025
1 parent 5087366 commit ff30a75
Show file tree
Hide file tree
Showing 5 changed files with 427 additions and 109 deletions.
105 changes: 80 additions & 25 deletions contrib/registry/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/go-kratos/kratos/v2/log"
Expand All @@ -26,10 +27,8 @@ const (

// Client is consul client config
type Client struct {
dc Datacenter
cli *api.Client
ctx context.Context
cancel context.CancelFunc
dc Datacenter
cli *api.Client

// resolve service entry endpoints
resolver ServiceResolver
Expand All @@ -41,6 +40,16 @@ type Client struct {
deregisterCriticalServiceAfter int
// serviceChecks user custom checks
serviceChecks api.AgentServiceChecks

// used to control heartbeat
lock sync.RWMutex
cancelers map[string]*canceler
}

type canceler struct {
ctx context.Context
cancel context.CancelFunc
done chan struct{}
}

func defaultResolver(_ context.Context, entries []*api.ServiceEntry) []*registry.ServiceInstance {
Expand Down Expand Up @@ -143,7 +152,7 @@ func (c *Client) singleDCEntries(service, tag string, passingOnly bool, opts *ap
}

// Register register service instance to consul
func (c *Client) Register(_ context.Context, svc *registry.ServiceInstance, enableHealthCheck bool) error {
func (c *Client) Register(ctx context.Context, svc *registry.ServiceInstance, enableHealthCheck bool) error {
addresses := make(map[string]api.ServiceAddress, len(svc.Endpoints))
checkAddresses := make([]string, 0, len(svc.Endpoints))
for _, endpoint := range svc.Endpoints {
Expand Down Expand Up @@ -190,13 +199,43 @@ func (c *Client) Register(_ context.Context, svc *registry.ServiceInstance, enab
})
}

err := c.cli.Agent().ServiceRegister(asr)
c.lock.Lock()
if cc, ok := c.cancelers[svc.ID]; ok {
cc.cancel()
<-cc.done
}
var cc *canceler
if c.heartbeat {
cancelCtx, cancel := context.WithCancel(context.Background())
cc = &canceler{
ctx: cancelCtx,
cancel: cancel,
done: make(chan struct{}),
}
c.cancelers[svc.ID] = cc
go func() {
<-cc.done
cc.cancel()
c.lock.Lock()
if c.cancelers[svc.ID] == cc {
delete(c.cancelers, svc.ID)
}
c.lock.Unlock()
}()
}
c.lock.Unlock()

err := c.cli.Agent().ServiceRegisterOpts(asr, api.ServiceRegisterOpts{}.WithContext(ctx))
if err != nil {
if c.heartbeat {
close(cc.done)
}
return err
}

if c.heartbeat {
go func() {
time.Sleep(time.Second)
defer close(cc.done)
err = c.cli.Agent().UpdateTTL("service:"+svc.ID, "pass", "pass")
if err != nil {
log.Errorf("[Consul]update ttl heartbeat to consul failed!err:=%v", err)
Expand All @@ -205,31 +244,23 @@ func (c *Client) Register(_ context.Context, svc *registry.ServiceInstance, enab
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
_ = c.cli.Agent().ServiceDeregister(svc.ID)
return
default:
}
select {
case <-c.ctx.Done():
case <-cc.ctx.Done():
_ = c.cli.Agent().ServiceDeregister(svc.ID)
return
case <-ticker.C:
// ensure that unregistered services will not be re-registered by mistake
if errors.Is(c.ctx.Err(), context.Canceled) || errors.Is(c.ctx.Err(), context.DeadlineExceeded) {
_ = c.cli.Agent().ServiceDeregister(svc.ID)
return
}
err = c.cli.Agent().UpdateTTLOpts("service:"+svc.ID, "pass", "pass", new(api.QueryOptions).WithContext(c.ctx))
err = c.cli.Agent().UpdateTTLOpts("service:"+svc.ID, "pass", "pass", new(api.QueryOptions).WithContext(cc.ctx))
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
_ = c.cli.Agent().ServiceDeregister(svc.ID)
return
}
if err != nil {
log.Errorf("[Consul] update ttl heartbeat to consul failed! err=%v", err)
// when the previous report fails, try to re register the service
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
if err := c.cli.Agent().ServiceRegister(asr); err != nil {
if err := sleepCtx(cc.ctx, time.Duration(rand.Intn(5))*time.Second); err != nil {
_ = c.cli.Agent().ServiceDeregister(svc.ID)
return
}
if err := c.cli.Agent().ServiceRegisterOpts(asr, api.ServiceRegisterOpts{}.WithContext(cc.ctx)); err != nil {
log.Errorf("[Consul] re registry service failed!, err=%v", err)
} else {
log.Warn("[Consul] re registry of service occurred success")
Expand All @@ -242,8 +273,32 @@ func (c *Client) Register(_ context.Context, svc *registry.ServiceInstance, enab
return nil
}

func sleepCtx(ctx context.Context, d time.Duration) error {
t := time.NewTimer(d)
defer t.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-t.C:
return nil
}
}

// Deregister service by service ID
func (c *Client) Deregister(_ context.Context, serviceID string) error {
defer c.cancel()
return c.cli.Agent().ServiceDeregister(serviceID)
func (c *Client) Deregister(ctx context.Context, serviceID string) error {
c.lock.RLock()
cc, ok := c.cancelers[serviceID]
c.lock.RUnlock()
if ok {
cc.cancel()
<-cc.done
}

err := c.cli.Agent().ServiceDeregisterOpts(serviceID, new(api.QueryOptions).WithContext(ctx))
var se api.StatusError
if errors.As(err, &se) && se.Code == 404 {
// not found
err = nil
}
return err
}
47 changes: 38 additions & 9 deletions contrib/registry/consul/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,12 @@ func New(apiClient *api.Client, opts ...Option) *Registry {
healthcheckInterval: 10,
heartbeat: true,
deregisterCriticalServiceAfter: 600,
cancelers: make(map[string]*canceler),
},
}
for _, o := range opts {
o(r)
}
r.cli.ctx, r.cli.cancel = context.WithCancel(context.Background())
return r
}

Expand All @@ -135,8 +135,8 @@ func (r *Registry) Deregister(ctx context.Context, svc *registry.ServiceInstance
// GetService return service by name
func (r *Registry) GetService(ctx context.Context, name string) ([]*registry.ServiceInstance, error) {
r.lock.RLock()
defer r.lock.RUnlock()
set := r.registry[name]
r.lock.RUnlock()

getRemote := func() []*registry.ServiceInstance {
services, _, err := r.cli.Service(ctx, name, 0, true)
Expand Down Expand Up @@ -181,17 +181,26 @@ func (r *Registry) ListServices() (allServices map[string][]*registry.ServiceIns

// Watch resolve service by name
func (r *Registry) Watch(ctx context.Context, name string) (registry.Watcher, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

r.lock.Lock()
defer r.lock.Unlock()
set, ok := r.registry[name]
if !ok {
cancelCtx, cancel := context.WithCancel(context.Background())
set = &serviceSet{
registry: r,
watcher: make(map[*watcher]struct{}),
services: &atomic.Value{},
serviceName: name,
ctx: cancelCtx,
cancel: cancel,
}
r.registry[name] = set
}
set.ref.Add(1)
r.lock.Unlock()

// init watcher
w := &watcher{
Expand All @@ -202,14 +211,21 @@ func (r *Registry) Watch(ctx context.Context, name string) (registry.Watcher, er
set.lock.Lock()
set.watcher[w] = struct{}{}
set.lock.Unlock()

ss, _ := set.services.Load().([]*registry.ServiceInstance)
if len(ss) > 0 {
// If the service has a value, it needs to be pushed to the watcher,
// otherwise the initial data may be blocked forever during the watch.
w.event <- struct{}{}
select {
case w.event <- struct{}{}:
default:
}
}
if err := r.resolve(ctx, set); err != nil {
return nil, err

if !ok {
if err := r.resolve(ctx, set); err != nil {
return nil, err
}
}
return w, nil
}
Expand Down Expand Up @@ -239,21 +255,34 @@ func (r *Registry) resolve(ctx context.Context, ss *serviceSet) error {
for {
select {
case <-ticker.C:
tmpService, tmpIdx, err := listServices(context.Background(), ss.serviceName, idx, true)
tmpService, tmpIdx, err := listServices(ss.ctx, ss.serviceName, idx, true)
if err != nil {
time.Sleep(time.Second)
if err := sleepCtx(ss.ctx, time.Second); err != nil {
return
}
continue
}
if len(tmpService) != 0 && tmpIdx != idx {
services = tmpService
ss.broadcast(services)
}
idx = tmpIdx
case <-ctx.Done():
case <-ss.ctx.Done():
return
}
}
}()

return nil
}

func (r *Registry) tryDelete(ss *serviceSet) bool {
r.lock.Lock()
defer r.lock.Unlock()
if ss.ref.Add(-1) != 0 {
return false
}
ss.cancel()
delete(r.registry, ss.serviceName)
return true
}
Loading

0 comments on commit ff30a75

Please sign in to comment.