diff --git a/cluster/node/actor.go b/cluster/node/actor.go index 161db99..edf9262 100644 --- a/cluster/node/actor.go +++ b/cluster/node/actor.go @@ -116,7 +116,7 @@ func (a *Actor) Next(ctx Context) { a.mailbox <- ctx } -// Deliver 投递消息到Actor中进行处理 +// Deliver 投递消息到当前Actor中进行处理 func (a *Actor) Deliver(uid int64, message *cluster.Message) { req := a.scheduler.node.reqPool.Get().(*request) req.nid = a.scheduler.node.opts.id @@ -128,6 +128,11 @@ func (a *Actor) Deliver(uid int64, message *cluster.Message) { a.Next(req) } +// Push 推送消息到本地Node队列上进行处理 +func (a *Actor) Push(uid int64, message *cluster.Message) { + a.scheduler.node.router.deliver("", a.scheduler.node.opts.id, a.PID(), 0, uid, message.Seq, message.Route, message.Data) +} + // Destroy 销毁Actor func (a *Actor) Destroy() { if !a.state.CompareAndSwap(started, destroyed) { diff --git a/cluster/node/context.go b/cluster/node/context.go index d81f5b9..e86eace 100644 --- a/cluster/node/context.go +++ b/cluster/node/context.go @@ -44,6 +44,8 @@ type Context interface { Context() context.Context // GetIP 获取客户端IP GetIP() (string, error) + // Deliver 投递消息给节点处理 + Deliver(args *cluster.DeliverArgs) error // Reply 回复消息 Reply(message *cluster.Message) error // Response 响应消息 diff --git a/cluster/node/event.go b/cluster/node/event.go index 9b1636e..848afc5 100644 --- a/cluster/node/event.go +++ b/cluster/node/event.go @@ -223,6 +223,11 @@ func (e *event) GetIP() (string, error) { }) } +// Deliver 投递消息给节点处理 +func (e *event) Deliver(args *cluster.DeliverArgs) error { + return e.node.proxy.Deliver(e.ctx, args) +} + // Reply 回复消息 func (e *event) Reply(message *cluster.Message) error { return e.node.proxy.Push(e.ctx, &cluster.PushArgs{ diff --git a/cluster/node/provider.go b/cluster/node/provider.go index 69b1011..956d54a 100644 --- a/cluster/node/provider.go +++ b/cluster/node/provider.go @@ -76,7 +76,7 @@ func (p *provider) Deliver(ctx context.Context, gid, nid string, cid, uid int64, } } - p.node.router.deliver(gid, nid, cid, uid, msg.Seq, msg.Route, msg.Buffer) + p.node.router.deliver(gid, nid, "", cid, uid, msg.Seq, msg.Route, msg.Buffer) return nil } diff --git a/cluster/node/proxy.go b/cluster/node/proxy.go index 28a6a29..f25fe7c 100644 --- a/cluster/node/proxy.go +++ b/cluster/node/proxy.go @@ -245,9 +245,8 @@ func (p *Proxy) Broadcast(ctx context.Context, args *cluster.BroadcastArgs) erro // Deliver 投递消息给节点处理 func (p *Proxy) Deliver(ctx context.Context, args *cluster.DeliverArgs) error { - if args.NID == p.GetID() { - p.node.router.deliver("", args.NID, 0, args.UID, args.Message.Seq, args.Message.Route, args.Message.Data) - return nil + if args.NID == p.node.opts.id { + return errors.ErrIllegalOperation } return p.nodeLinker.Deliver(ctx, &link.DeliverArgs{ diff --git a/cluster/node/request.go b/cluster/node/request.go index 8dbb930..347dad6 100644 --- a/cluster/node/request.go +++ b/cluster/node/request.go @@ -23,6 +23,7 @@ type request struct { ctx context.Context // 上下文 gid string // 来源网关ID nid string // 来源节点ID + pid string // 来源Actor ID cid int64 // 连接ID uid int64 // 用户ID message *cluster.Message // 请求消息 @@ -271,14 +272,25 @@ func (r *request) Deliver(args *cluster.DeliverArgs) error { // Reply 回复消息 func (r *request) Reply(message *cluster.Message) error { switch { - case r.gid != "": + case r.gid != "": // 来源于网关 return r.node.proxy.Push(r.ctx, &cluster.PushArgs{ GID: r.gid, Kind: session.Conn, Target: r.cid, Message: message, }) - case r.nid != "": + case r.pid != "": // 来源于Actor + if actor, ok := r.node.scheduler.doLoad(r.pid); ok { + actor.Deliver(r.uid, message) + return nil + } else { + return errors.ErrIllegalOperation + } + case r.nid != "": // 来源于其他Node + if r.nid == r.node.opts.id { + return nil + } + return r.node.proxy.Deliver(r.ctx, &cluster.DeliverArgs{ NID: r.nid, UID: r.uid, diff --git a/cluster/node/router.go b/cluster/node/router.go index 6ce8540..66a5555 100644 --- a/cluster/node/router.go +++ b/cluster/node/router.go @@ -119,10 +119,11 @@ func (r *Router) Group(groups ...func(group *RouterGroup)) *RouterGroup { return group } -func (r *Router) deliver(gid, nid string, cid, uid int64, seq, route int32, data interface{}) { +func (r *Router) deliver(gid, nid, pid string, cid, uid int64, seq, route int32, data interface{}) { req := r.node.reqPool.Get().(*request) req.gid = gid req.nid = nid + req.pid = pid req.cid = cid req.uid = uid req.message.Seq = seq diff --git a/cluster/node/scheduler.go b/cluster/node/scheduler.go index cc8d9f9..0639201 100644 --- a/cluster/node/scheduler.go +++ b/cluster/node/scheduler.go @@ -96,7 +96,12 @@ func (s *Scheduler) remove(kind, id string) (*Actor, bool) { // 加载Actor func (s *Scheduler) load(kind, id string) (*Actor, bool) { - if actor, ok := s.actors.Load(kind + "/" + id); ok { + return s.doLoad(kind + "/" + id) +} + +// 执行加载Actor +func (s *Scheduler) doLoad(pid string) (*Actor, bool) { + if actor, ok := s.actors.Load(pid); ok { return actor.(*Actor), true }