Skip to content

Commit

Permalink
support mesh service
Browse files Browse the repository at this point in the history
  • Loading branch information
dobyte committed Apr 1, 2023
1 parent 93d46cc commit df3a612
Show file tree
Hide file tree
Showing 55 changed files with 964 additions and 225 deletions.
45 changes: 0 additions & 45 deletions .github/workflows/go.yml

This file was deleted.

1 change: 1 addition & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const (
Master Kind = "master" // 管理服
Gate Kind = "gate" // 网关服
Node Kind = "node" // 节点服
Mesh Kind = "mesh" // 微服务
)

// State 集群实例状态
Expand Down
30 changes: 15 additions & 15 deletions cluster/gate/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (g *Gate) Init() {
func (g *Gate) Start() {
g.startNetworkServer()

g.startTransportServer()
g.startRPCServer()

g.registerServiceInstance()

Expand All @@ -99,7 +99,7 @@ func (g *Gate) Destroy() {

g.stopNetworkServer()

g.stopTransportServer()
g.stopRPCServer()

g.cancel()
}
Expand All @@ -111,14 +111,14 @@ func (g *Gate) startNetworkServer() {
g.opts.server.OnReceive(g.handleReceive)

if err := g.opts.server.Start(); err != nil {
log.Fatalf("the gate server startup failed: %v", err)
log.Fatalf("the network server start failed: %v", err)
}
}

// 停止网关服务器
func (g *Gate) stopNetworkServer() {
if err := g.opts.server.Stop(); err != nil {
log.Errorf("the gate server stop failed: %v", err)
log.Errorf("the network server stop failed: %v", err)
}
}

Expand Down Expand Up @@ -162,30 +162,30 @@ func (g *Gate) handleReceive(conn network.Conn, data []byte, _ int) {
err = g.proxy.deliver(ctx, conn.ID(), conn.UID(), message)
cancel()
if err != nil {
log.Errorf("deliver message failed: %v", err)
log.Warnf("deliver message failed: %v", err)
}
}

// 启动RPC服务器
func (g *Gate) startTransportServer() {
func (g *Gate) startRPCServer() {
var err error

g.rpc, err = g.opts.transporter.NewGateServer(&provider{g})
if err != nil {
log.Fatalf("the transport server build failed: %v", err)
log.Fatalf("the rpc server create failed: %v", err)
}

go func() {
if err = g.rpc.Start(); err != nil {
log.Fatalf("the transport server startup failed: %v", err)
log.Fatalf("the rpc server start failed: %v", err)
}
}()
}

// 停止RPC服务器
func (g *Gate) stopTransportServer() {
func (g *Gate) stopRPCServer() {
if err := g.rpc.Stop(); err != nil {
log.Errorf("the transport server stop failed: %v", err)
log.Errorf("the rpc server stop failed: %v", err)
}
}

Expand All @@ -204,7 +204,7 @@ func (g *Gate) registerServiceInstance() {
err := g.opts.registry.Register(ctx, g.instance)
cancel()
if err != nil {
log.Fatalf("the gate service instance register failed: %v", err)
log.Fatalf("register service instance failed: %v", err)
}
}

Expand All @@ -214,12 +214,12 @@ func (g *Gate) deregisterServiceInstance() {
err := g.opts.registry.Deregister(ctx, g.instance)
defer cancel()
if err != nil {
log.Errorf("the gate service instance deregister failed: %v", err)
log.Errorf("deregister service instance failed: %v", err)
}
}

func (g *Gate) debugPrint() {
log.Debugf("The gate server startup successful")
log.Debugf("Network server, listen: %s protocol: %s", xnet.FulfillAddr(g.opts.server.Addr()), g.opts.server.Protocol())
log.Debugf("Transport server, listen: %s protocol: %s", xnet.FulfillAddr(g.rpc.Addr()), g.rpc.Scheme())
log.Debugf("the gate server startup successful")
log.Debugf("the %s server listen on %s", g.opts.server.Protocol(), xnet.FulfillAddr(g.opts.server.Addr()))
log.Debugf("the %s server listen on %s", g.rpc.Scheme(), xnet.FulfillAddr(g.rpc.Addr()))
}
2 changes: 1 addition & 1 deletion cluster/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,5 @@ func (m *Master) Proxy() *Proxy {
}

func (m *Master) debugPrint() {
log.Debugf("The master server startup successful")
log.Debugf("the master server startup successful")
}
9 changes: 9 additions & 0 deletions cluster/master/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/dobyte/due/internal/link"
"github.com/dobyte/due/registry"
"github.com/dobyte/due/session"
"github.com/dobyte/due/transport"
)

var (
Expand Down Expand Up @@ -47,6 +48,14 @@ func (p *Proxy) GetMasterName() string {
return p.master.opts.name
}

// NewServiceClient 新建微服务客户端
// target参数可分为两种模式:
// 直连模式: direct://127.0.0.1:8011
// 服务发现模式: discovery://service_name
func (p *Proxy) NewServiceClient(target string) (transport.ServiceClient, error) {
return p.master.opts.transporter.NewServiceClient(target)
}

// LocateGate 定位用户所在网关
func (p *Proxy) LocateGate(ctx context.Context, uid int64) (string, error) {
return p.link.LocateGate(ctx, uid)
Expand Down
187 changes: 187 additions & 0 deletions cluster/mesh/mesh.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package mesh

import (
"context"
"github.com/dobyte/due/cluster"
"github.com/dobyte/due/component"
"github.com/dobyte/due/log"
"github.com/dobyte/due/registry"
"github.com/dobyte/due/transport"
"github.com/dobyte/due/utils/xnet"
"github.com/dobyte/due/utils/xuuid"
"golang.org/x/sync/errgroup"
"time"
)

type Mesh struct {
component.Base
opts *options
ctx context.Context
cancel context.CancelFunc
state cluster.State
proxy *Proxy
services []*serviceEntity
instances []*registry.ServiceInstance
rpc transport.Server
}

type serviceEntity struct {
name string // 服务名称;用于定位服务发现
desc interface{} // 服务描述(grpc为desc描述对象; rpcx为服务路径)
provider interface{} // 服务提供者
}

func NewMesh(opts ...Option) *Mesh {
o := defaultOptions()
for _, opt := range opts {
opt(o)
}

m := &Mesh{}
m.opts = o
m.state = cluster.Shut
m.services = make([]*serviceEntity, 0)
m.instances = make([]*registry.ServiceInstance, 0)
m.proxy = newProxy(m)
m.ctx, m.cancel = context.WithCancel(o.ctx)

return m
}

// Name 组件名称
func (m *Mesh) Name() string {
return m.opts.name
}

// Init 初始化节点
func (m *Mesh) Init() {
if m.opts.codec == nil {
log.Fatal("codec component is not injected")
}

if m.opts.locator == nil {
log.Fatal("locator component is not injected")
}

if m.opts.registry == nil {
log.Fatal("registry component is not injected")
}

if m.opts.transporter == nil {
log.Fatal("transporter component is not injected")
}
}

// Start 启动
func (m *Mesh) Start() {
m.state = cluster.Work

m.startRPCServer()

m.registerServiceInstances()

m.proxy.watch(m.ctx)

m.debugPrint()
}

// Destroy 销毁网关服务器
func (m *Mesh) Destroy() {
m.deregisterServiceInstances()

m.stopRPCServer()

m.cancel()
}

// Proxy 获取节点代理
func (m *Mesh) Proxy() *Proxy {
return m.proxy
}

// 启动RPC服务器
func (m *Mesh) startRPCServer() {
var err error

m.rpc, err = m.opts.transporter.NewServiceServer()
if err != nil {
log.Fatalf("the rpc server create failed: %v", err)
}

for _, entity := range m.services {
err = m.rpc.RegisterService(entity.desc, entity.provider)
if err != nil {
log.Fatalf("register service failed: %v", err)
}
}

go func() {
if err = m.rpc.Start(); err != nil {
log.Fatalf("the rpc server start failed: %v", err)
}
}()
}

// 停止RPC服务器
func (m *Mesh) stopRPCServer() {
if err := m.rpc.Stop(); err != nil {
log.Errorf("the rpc server stop failed: %v", err)
}
}

// 注册服务实例
func (m *Mesh) registerServiceInstances() {
endpoint := m.rpc.Endpoint().String()

for _, entity := range m.services {
id, err := xuuid.UUID()
if err != nil {
log.Fatalf("generate service id failed: %v", err)
}

m.instances = append(m.instances, &registry.ServiceInstance{
ID: id,
Name: entity.name,
Kind: cluster.Mesh,
Alias: entity.name,
State: cluster.Work,
Endpoint: endpoint,
})
}

eg, ctx := errgroup.WithContext(m.ctx)
for i := range m.instances {
instance := m.instances[i]
eg.Go(func() error {
rctx, rcancel := context.WithTimeout(ctx, 10*time.Second)
defer rcancel()
return m.opts.registry.Register(rctx, instance)
})
}

if err := eg.Wait(); err != nil {
log.Fatalf("register service instance failed: %v", err)
}
}

// 解注册服务实例
func (m *Mesh) deregisterServiceInstances() {
eg, ctx := errgroup.WithContext(m.ctx)
for i := range m.instances {
instance := m.instances[i]
eg.Go(func() error {
dctx, dcancel := context.WithTimeout(ctx, 10*time.Second)
defer dcancel()
return m.opts.registry.Deregister(dctx, instance)
})
}

if err := eg.Wait(); err != nil {
log.Errorf("deregister service instance failed: %v", err)
}
}

func (m *Mesh) debugPrint() {
log.Debugf("the mesh server startup successful")
log.Debugf("the %s server listen on %s", m.rpc.Scheme(), xnet.FulfillAddr(m.rpc.Addr()))
}
Loading

0 comments on commit df3a612

Please sign in to comment.