From 95d1e5acfcb1b1ad9487148a780c694243c3b043 Mon Sep 17 00:00:00 2001 From: novice2194 <2194150786@qq.com> Date: Sun, 26 Feb 2023 18:40:51 +0800 Subject: [PATCH 1/4] fix: goroutine bug --- grpcrun/go_grpc.go | 56 ++++++++++++++++++++++------------- grpcrun/grpcrun_test.go | 65 +++++++++++++++++++++++++++++++++-------- grpcrun/task.go | 9 +++--- 3 files changed, 92 insertions(+), 38 deletions(-) diff --git a/grpcrun/go_grpc.go b/grpcrun/go_grpc.go index 0931ce8..ac34f4a 100644 --- a/grpcrun/go_grpc.go +++ b/grpcrun/go_grpc.go @@ -15,12 +15,16 @@ import ( var ( mu sync.Mutex + log *zap.Logger node *snowflake.Node ) func init() { var err error mu = sync.Mutex{} + log, _ = zap.NewDevelopment() + zap.ReplaceGlobals(log) + // zap.ReplaceGlobals(&log) if node, err = snowflake.NewNode(int64(time.Now().Day())); err != nil { panic(err) } @@ -37,36 +41,40 @@ func init() { // run.Wait() // } type GoGrpc struct { - mu sync.Mutex - ctx context.Context - cancel context.CancelFunc - wait sync.WaitGroup - Timeout time.Duration - Task map[string]*GrpcTask + // mu sync.Mutex + ctx context.Context + cancel context.CancelFunc + wait sync.WaitGroup + time time.Duration + Task map[string]*GrpcTask } func NewGoGrpc() *GoGrpc { mu.Lock() defer mu.Unlock() g := GoGrpc{} - g.ctx, g.cancel = context.WithTimeout(context.Background(), 3*time.Second) - g.mu = sync.Mutex{} + // g.mu = sync.Mutex{} + g.time = 3 * time.Second g.wait = sync.WaitGroup{} g.Task = make(map[string]*GrpcTask, 0) + g.ctx, g.cancel = context.WithTimeout(context.Background(), g.time) return &g } // SetTimeout reset timeout, replace default timeout with a special time duration func (g *GoGrpc) SetTimeout(timeout time.Duration) { mu.Lock() - mu.Unlock() - g.ctx, g.cancel = context.WithTimeout(context.Background(), timeout) + defer mu.Unlock() + g.time = timeout } func (g *GoGrpc) Run() { - for _, t := range g.Task { - go g.run(t) + for key, _ := range g.Task { + go func(k string) { + go g.run(g.Task[k]) + }(key) } + g.Wait() } func (g *GoGrpc) Wait() { @@ -75,25 +83,29 @@ func (g *GoGrpc) Wait() { } func (g *GoGrpc) AddTask(task *GrpcTask) { - g.mu.Lock() - defer g.mu.Unlock() + // g.mu.Lock() + // defer g.mu.Unlock() g.Task[task.Name] = task g.wait.Add(1) } func (g *GoGrpc) AddNewTask(grpcName string, grpcMethod any, request any) { - g.mu.Lock() - defer g.mu.Unlock() - zap.S() - task := GrpcTask{ - ctx: &g.ctx, + // g.mu.Lock() + // defer g.mu.Unlock() + + if grpcName == "" { + grpcName = node.Generate().String() + } + + task := &GrpcTask{ + ctx: g.ctx, grpcMethod: grpcMethod, request: request, Name: grpcName, - log: zap.S().Named(grpcName), + log: zap.S(), } - g.Task[node.Generate().String()] = &task + g.Task[task.Name] = task g.wait.Add(1) return } @@ -103,10 +115,12 @@ func (g *GoGrpc) run(t *GrpcTask) { for { select { case <-g.ctx.Done(): + t.log.Info("context done") t.Err = errors.New("context canceled") return default: t.Call() + t.log.Info("success call function") return } } diff --git a/grpcrun/grpcrun_test.go b/grpcrun/grpcrun_test.go index 4c36390..bb8d945 100644 --- a/grpcrun/grpcrun_test.go +++ b/grpcrun/grpcrun_test.go @@ -6,6 +6,7 @@ import ( "fmt" "go.uber.org/zap" "strconv" + "sync" "testing" "time" @@ -66,6 +67,13 @@ func Login4(ctx context.Context, req *loginReq) (*loginResp, int) { return &loginResp{UserId: 21, Token: "test grpc call success"}, 1 } +func Login5(ctx context.Context, req *loginReq) (*loginResp, error) { + fmt.Println("sleep: ", req) + time.Sleep(4 * time.Second) + fmt.Println("over: ", req) + return &loginResp{UserId: 2333, Token: "test grpc call success"}, nil +} + var ( datas []*data ) @@ -73,7 +81,7 @@ var ( func TestGrpcTask(t *testing.T) { for i, d := range datas { - call := grpcrun.NewGrpcTask(&d.ctx, "test{"+strconv.Itoa(i)+"}", d.method, d.req) + call := grpcrun.NewGrpcTask(d.ctx, "test{"+strconv.Itoa(i)+"}", d.method, d.req) call.Call() t.Logf("第 %d 次执行\n", i+1) @@ -108,17 +116,18 @@ func init() { // 测试表格 datas = []*data{ - newData(ctx, Login, req), // 正常 - newData(ctx, Login1, req), // [grpcMethod]必须有2个参数(context.Context, *request) - newData(ctx, Login2, req), // [grpcMethod]的第1个参数必须是:context.Context - newData(ctx, Login3, req), // [grpcMethod]必须有2个返回值(*Response, error) - newData(ctx, Login4, req), // [grpcMethod]的第2个返回值必须是:error - newData(nil, Login, req), // 请正确的传递[Context],不支持:nil - newData(ctx, nil, req), // [grpcMethod]必须是一个GRPC的函数类型,现在是:invalid - newData(ctx, Login, nil), // 请正确的传递[request],不支持:invalid - newData(ctx, "其他类型", req), // [grpcMethod]必须是一个GRPC的函数类型,现在是:string - newData(ctx, Login, "其他类型"), // 请正确的传入[request],不支持:string - newData(ctx, Login, zap.S()), // [request]的参数与[grpcMethod]的参数不匹配:grpcMethod = v3_test.loginReq, request = zap.SugaredLogger + newData(ctx, Login, req), // 正常 + newData(ctx, Login1, req), // [grpcMethod]必须有2个参数(context.Context, *request) + newData(ctx, Login2, req), // [grpcMethod]的第1个参数必须是:context.Context + newData(ctx, Login3, req), // [grpcMethod]必须有2个返回值(*Response, error) + newData(ctx, Login4, req), // [grpcMethod]的第2个返回值必须是:error + newData(ctx, Login5, req), // [grpcMethod]的 timeout + newData(nil, Login, req), // 请正确的传递[Context],不支持:nil + newData(ctx, nil, req), // [grpcMethod]必须是一个GRPC的函数类型,现在是:invalid + newData(ctx, Login, nil), // 请正确的传递[request],不支持:invalid + newData(ctx, "其他类型", req), // [grpcMethod]必须是一个GRPC的函数类型,现在是:string + newData(ctx, Login, "其他类型"), // 请正确的传入[request],不支持:string + newData(ctx, Login, zap.S()), // [request]的参数与[grpcMethod]的参数不匹配:grpcMethod = v3_test.loginReq, request = zap.SugaredLogger } } @@ -148,3 +157,35 @@ func TestGoGrpc_Run(t *testing.T) { fmt.Println() } } + +func TestGoGrpc_Timeout(t *testing.T) { + run := grpcrun.NewGoGrpc() + for i, d := range datas { + run.AddNewTask("test{"+strconv.Itoa(i)+"}", d.method, d.req) + } + + run.Run() + + for k, t := range run.Task { + if t.Err != nil { + fmt.Println(k, t.Err.(error)) + } + } + // fmt.Println(run.Task["test{5}"].Err.(error)) +} + +func TestGo(t *testing.T) { + type muNum struct { + mu sync.Mutex + num int + } + n := muNum{num: 1} + for i := 1; i <= 10; i++ { + go func(num int) { + time.Sleep(time.Second) + n.num = num + fmt.Println(num) + }(i) + } + time.Sleep(2 * time.Second) +} diff --git a/grpcrun/task.go b/grpcrun/task.go index e718251..235e964 100644 --- a/grpcrun/task.go +++ b/grpcrun/task.go @@ -14,7 +14,7 @@ type GrpcTask struct { grpcMethod any // GRPC的调用参数 - ctx *context.Context + ctx context.Context request any // GRPC的调用返回值 @@ -30,10 +30,9 @@ type GrpcTask struct { // // Note: // @param grpcName string name of the grpc, this should be unique -func NewGrpcTask(ctx *context.Context, grpcName string, grpcMethod any, request any) *GrpcTask { +func NewGrpcTask(ctx context.Context, grpcName string, grpcMethod any, request any) *GrpcTask { mu.Lock() defer mu.Unlock() - zap.S() if grpcName == "" { grpcName = node.Generate().String() @@ -66,7 +65,7 @@ func (c *GrpcTask) call() { // 调用参数 argv := make([]reflect.Value, 2) - argv[0] = reflect.ValueOf(*c.ctx) + argv[0] = reflect.ValueOf(c.ctx) argv[1] = reflect.ValueOf(c.request) // 反射调用 @@ -90,7 +89,7 @@ func (c *GrpcTask) validate() { } // 校验 ctx 类型 - ctxV := reflect.ValueOf(c.ctx).Elem() + ctxV := reflect.ValueOf(&c.ctx).Elem() if ctxV.IsNil() { c.Err = fmt.Errorf("请正确的传递[Context],不支持:nil") return From 0b89f2f01479fc3788dc43ee608d01679054378c Mon Sep 17 00:00:00 2001 From: novice2194 <2194150786@qq.com> Date: Sun, 26 Feb 2023 18:55:12 +0800 Subject: [PATCH 2/4] perf: performance optimization modified: grpcrun/go_grpc.go modified: grpcrun/grpcrun_test.go --- grpcrun/go_grpc.go | 6 ++---- grpcrun/grpcrun_test.go | 20 +++++++++++++++++--- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/grpcrun/go_grpc.go b/grpcrun/go_grpc.go index ac34f4a..bc7b8de 100644 --- a/grpcrun/go_grpc.go +++ b/grpcrun/go_grpc.go @@ -69,10 +69,8 @@ func (g *GoGrpc) SetTimeout(timeout time.Duration) { } func (g *GoGrpc) Run() { - for key, _ := range g.Task { - go func(k string) { - go g.run(g.Task[k]) - }(key) + for _, task := range g.Task { + go g.run(task) } g.Wait() } diff --git a/grpcrun/grpcrun_test.go b/grpcrun/grpcrun_test.go index bb8d945..f894f9e 100644 --- a/grpcrun/grpcrun_test.go +++ b/grpcrun/grpcrun_test.go @@ -69,13 +69,14 @@ func Login4(ctx context.Context, req *loginReq) (*loginResp, int) { func Login5(ctx context.Context, req *loginReq) (*loginResp, error) { fmt.Println("sleep: ", req) - time.Sleep(4 * time.Second) + time.Sleep(time.Second) fmt.Println("over: ", req) return &loginResp{UserId: 2333, Token: "test grpc call success"}, nil } var ( - datas []*data + datas []*data + timeouts []*data ) func TestGrpcTask(t *testing.T) { @@ -130,6 +131,19 @@ func init() { newData(ctx, Login, zap.S()), // [request]的参数与[grpcMethod]的参数不匹配:grpcMethod = v3_test.loginReq, request = zap.SugaredLogger } + + timeouts = []*data{ + newData(ctx, Login5, req), // [grpcMethod]的 timeout + newData(ctx, Login5, req), // [grpcMethod]的 timeout + newData(ctx, Login5, req), // [grpcMethod]的 timeout + newData(ctx, Login5, req), // [grpcMethod]的 timeout + newData(ctx, Login5, req), // [grpcMethod]的 timeout + newData(ctx, Login5, req), // [grpcMethod]的 timeout + newData(ctx, Login5, req), // [grpcMethod]的 timeout + newData(ctx, Login5, req), // [grpcMethod]的 timeout + newData(ctx, Login5, req), // [grpcMethod]的 timeout + newData(ctx, Login5, req), // [grpcMethod]的 timeout + } } func TestGoGrpc_AddNewTask(t *testing.T) { @@ -160,7 +174,7 @@ func TestGoGrpc_Run(t *testing.T) { func TestGoGrpc_Timeout(t *testing.T) { run := grpcrun.NewGoGrpc() - for i, d := range datas { + for i, d := range timeouts { run.AddNewTask("test{"+strconv.Itoa(i)+"}", d.method, d.req) } From 7119128edef2c83460a662d0b7d3060b7550205b Mon Sep 17 00:00:00 2001 From: novice2194 <2194150786@qq.com> Date: Sun, 26 Feb 2023 19:59:33 +0800 Subject: [PATCH 3/4] perf: add mutex --- grpcrun/go_grpc.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/grpcrun/go_grpc.go b/grpcrun/go_grpc.go index bc7b8de..342e9b0 100644 --- a/grpcrun/go_grpc.go +++ b/grpcrun/go_grpc.go @@ -41,7 +41,7 @@ func init() { // run.Wait() // } type GoGrpc struct { - // mu sync.Mutex + mu sync.Mutex ctx context.Context cancel context.CancelFunc wait sync.WaitGroup @@ -53,7 +53,7 @@ func NewGoGrpc() *GoGrpc { mu.Lock() defer mu.Unlock() g := GoGrpc{} - // g.mu = sync.Mutex{} + g.mu = sync.Mutex{} g.time = 3 * time.Second g.wait = sync.WaitGroup{} g.Task = make(map[string]*GrpcTask, 0) @@ -63,8 +63,8 @@ func NewGoGrpc() *GoGrpc { // SetTimeout reset timeout, replace default timeout with a special time duration func (g *GoGrpc) SetTimeout(timeout time.Duration) { - mu.Lock() - defer mu.Unlock() + g.mu.Lock() + defer g.mu.Unlock() g.time = timeout } @@ -81,15 +81,15 @@ func (g *GoGrpc) Wait() { } func (g *GoGrpc) AddTask(task *GrpcTask) { - // g.mu.Lock() - // defer g.mu.Unlock() + g.mu.Lock() + defer g.mu.Unlock() g.Task[task.Name] = task g.wait.Add(1) } func (g *GoGrpc) AddNewTask(grpcName string, grpcMethod any, request any) { - // g.mu.Lock() - // defer g.mu.Unlock() + g.mu.Lock() + defer g.mu.Unlock() if grpcName == "" { grpcName = node.Generate().String() From 02dbe91e952f4fa65efe35037e1e3673ffec562a Mon Sep 17 00:00:00 2001 From: novice2194 <2194150786@qq.com> Date: Sun, 26 Feb 2023 22:28:00 +0800 Subject: [PATCH 4/4] docu/ReadMe --- README.md | 65 ++++++++++++++++++++++++++++++++++++++++++++-- grpcrun/go_grpc.go | 5 +++- 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 667059b..3d1ae2c 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,66 @@ + # grpc goroutine +Running multiple grpc calls separately into goroutine. -* 利用go的并发能力,将GRPC放置在后台运行。 -* 统一封装,适用与所有的GRPC调用 +If you want to run multiple grpc calls into goroutine, you might be like this: +```go +// goroutine 1 +go func(c context.Context) { + defer wait.Done() + for { + select { + case <-ctx.Done(): + return + default: + grpc.ServiceClient() + } + } +}(ctx) +// goroutine 2 +go func(c context.Context) { + defer wait.Done() + for { + select { + case <-ctx.Done(): + return + default: + grpc.ServiceClient() + } + } +}(ctx) +··· +``` +but now you can like this: +```go +run := GoGrpc{} +run.AddNewTask("grpcName", grpcMethod, grpcRequest) +run.Run() +run.Wait() +``` +**Note: grpcName must is a unique value** + +## use +Simple example: +```go +func example() { + run := GoGrpc{} + run.AddNewTask("grpcName", grpcMethod, grpcRequest) + run.Run() + run.Wait() +} +``` +Or you could use NewGrpcTask creat a grpc task +```go +func example() { + run := GoGrpc{} + task := &NewGrpcTask{ctx, "grpcName", grpcMethod, grpcRequest} + run.AddTask(task) + run.Run() + run.Wait() +} +``` diff --git a/grpcrun/go_grpc.go b/grpcrun/go_grpc.go index 342e9b0..f0522bd 100644 --- a/grpcrun/go_grpc.go +++ b/grpcrun/go_grpc.go @@ -49,6 +49,7 @@ type GoGrpc struct { Task map[string]*GrpcTask } +// NewGoGrpc return a GoGrpc Pointer func NewGoGrpc() *GoGrpc { mu.Lock() defer mu.Unlock() @@ -61,13 +62,14 @@ func NewGoGrpc() *GoGrpc { return &g } -// SetTimeout reset timeout, replace default timeout with a special time duration +// SetTimeout reset timeout, replace default timeout with a special time func (g *GoGrpc) SetTimeout(timeout time.Duration) { g.mu.Lock() defer g.mu.Unlock() g.time = timeout } +// Run running all tasks separately in goroutine func (g *GoGrpc) Run() { for _, task := range g.Task { go g.run(task) @@ -75,6 +77,7 @@ func (g *GoGrpc) Run() { g.Wait() } +// Wait blocks until the goroutine is stopped func (g *GoGrpc) Wait() { defer g.cancel() g.wait.Wait()