Skip to content

Commit

Permalink
added rolling-update feature
Browse files Browse the repository at this point in the history
  • Loading branch information
dobyte committed Nov 20, 2024
1 parent 86f7e7a commit b943806
Showing 1 changed file with 38 additions and 1 deletion.
39 changes: 38 additions & 1 deletion cluster/gate/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@ import (
"github.com/dobyte/due/v2/network"
"github.com/dobyte/due/v2/registry"
"github.com/dobyte/due/v2/session"
"sync"
"sync/atomic"
)

type Gate struct {
component.Base
opts *options
ctx context.Context
cancel context.CancelFunc
state atomic.Int32
proxy *proxy
instance *registry.ServiceInstance
session *session.Session
linker *gate.Server
wg *sync.WaitGroup
}

func NewGate(opts ...Option) *Gate {
Expand All @@ -43,6 +47,8 @@ func NewGate(opts ...Option) *Gate {
g.ctx, g.cancel = context.WithCancel(o.ctx)
g.proxy = newProxy(g)
g.session = session.NewSession()
g.state.Store(int32(cluster.Shut))
g.wg = &sync.WaitGroup{}

return g
}
Expand Down Expand Up @@ -73,6 +79,10 @@ func (g *Gate) Init() {

// Start 启动组件
func (g *Gate) Start() {
if !g.state.CompareAndSwap(int32(cluster.Shut), int32(cluster.Work)) {
return
}

g.startNetworkServer()

g.startLinkerServer()
Expand All @@ -84,8 +94,25 @@ func (g *Gate) Start() {
g.printInfo()
}

// Close 关闭节点
func (g *Gate) Close() {
if !g.state.CompareAndSwap(int32(cluster.Work), int32(cluster.Hang)) {
if !g.state.CompareAndSwap(int32(cluster.Busy), int32(cluster.Hang)) {
return
}
}

g.registerServiceInstance()

g.wg.Wait()
}

// Destroy 销毁组件
func (g *Gate) Destroy() {
if !g.state.CompareAndSwap(int32(cluster.Hang), int32(cluster.Shut)) {
return
}

g.deregisterServiceInstance()

g.stopNetworkServer()
Expand Down Expand Up @@ -115,9 +142,12 @@ func (g *Gate) stopNetworkServer() {

// 处理连接打开
func (g *Gate) handleConnect(conn network.Conn) {
g.wg.Add(1)

g.session.AddConn(conn)

cid, uid := conn.ID(), conn.UID()

ctx, cancel := context.WithTimeout(g.ctx, g.opts.timeout)
g.proxy.trigger(ctx, cluster.Connect, cid, uid)
cancel()
Expand All @@ -137,6 +167,8 @@ func (g *Gate) handleDisconnect(conn network.Conn) {
g.proxy.trigger(ctx, cluster.Disconnect, cid, uid)
cancel()
}

g.wg.Done()
}

// 处理接收到的消息
Expand Down Expand Up @@ -177,7 +209,7 @@ func (g *Gate) registerServiceInstance() {
Name: cluster.Gate.String(),
Kind: cluster.Gate.String(),
Alias: g.opts.name,
State: cluster.Work.String(),
State: g.getState().String(),
Endpoint: g.linker.Endpoint().String(),
}

Expand All @@ -199,6 +231,11 @@ func (g *Gate) deregisterServiceInstance() {
}
}

// 获取状态
func (g *Gate) getState() cluster.State {
return cluster.State(g.state.Load())
}

// 打印组件信息
func (g *Gate) printInfo() {
infos := make([]string, 0)
Expand Down

0 comments on commit b943806

Please sign in to comment.