Skip to content

Commit

Permalink
Merge pull request #1386 from cloudwego/release-v0.10.0
Browse files Browse the repository at this point in the history
chore: release v0.10.0
  • Loading branch information
ppzqh authored Jun 12, 2024
2 parents 8526b3a + 15964cb commit 22991e2
Show file tree
Hide file tree
Showing 100 changed files with 4,853 additions and 1,236 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: '1.17'
go-version: '1.20'
- name: Unit Test
run: go test -gcflags=-l -race -covermode=atomic -coverprofile=coverage.txt ./...
- name: Scenario Tests
Expand All @@ -31,7 +31,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: '1.17'
go-version: '1.18'
- name: Benchmark
run: go test -gcflags='all=-N -l' -bench=. -benchmem -run=none ./...

Expand All @@ -57,7 +57,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: '1.17'
go-version: '1.19'
- name: Prepare
run: |
go install github.com/cloudwego/thriftgo@main
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,6 @@ tool/cmd/kitex/kitex

# remote dump file
*.json
base1.go
dump.txt
base2.go
3 changes: 1 addition & 2 deletions CREDITS
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
github.com/stretchr/testify
github.com/choleraehyq/pid
github.com/stretchr/testify
2 changes: 1 addition & 1 deletion client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func WithRetryMethodPolicies(mp map[string]retry.Policy) Option {
// But if your retry policy is enabled by remote config, WithSpecifiedResultRetry is useful.
func WithSpecifiedResultRetry(rr *retry.ShouldResultRetry) Option {
return Option{F: func(o *client.Options, di *utils.Slice) {
if rr == nil || (rr.RespRetry == nil && rr.ErrorRetry == nil) {
if rr == nil || !rr.IsValid() {
panic(fmt.Errorf("WithSpecifiedResultRetry: invalid '%+v'", rr))
}
di.Push(fmt.Sprintf("WithSpecifiedResultRetry(%+v)", rr))
Expand Down
12 changes: 6 additions & 6 deletions client/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,40 +57,40 @@ func TestRetryOptionDebugInfo(t *testing.T) {
fp.WithDDLStop()
expectPolicyStr := "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:false DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:none CfgItems:map[]} RetrySameNode:false ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
policyStr := fmt.Sprintf("WithFailureRetry(%+v)", *fp)
policyStr := fmt.Sprintf("WithFailureRetry(%+v)", fp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)

fp.WithFixedBackOff(10)
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:false DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:fixed CfgItems:map[fix_ms:10]} RetrySameNode:false ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", *fp)
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", fp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)

fp.WithRandomBackOff(10, 20)
fp.DisableChainRetryStop()
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:true DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:random CfgItems:map[max_ms:20 min_ms:10]} RetrySameNode:false ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", *fp)
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", fp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)

fp.WithRetrySameNode()
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:true DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:random CfgItems:map[max_ms:20 min_ms:10]} RetrySameNode:true ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", *fp)
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", fp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)

fp.WithSpecifiedResultRetry(&retry.ShouldResultRetry{ErrorRetry: func(err error, ri rpcinfo.RPCInfo) bool {
return false
}})
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:true DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:random CfgItems:map[max_ms:20 min_ms:10]} RetrySameNode:true ShouldResultRetry:{ErrorRetry:true, RespRetry:false}})"
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", *fp)
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", fp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)

bp := retry.NewBackupPolicy(20)
expectPolicyStr = "WithBackupRequest({RetryDelayMS:20 StopPolicy:{MaxRetryTimes:1 MaxDurationMS:0 DisableChainStop:false " +
"DDLStop:false CBPolicy:{ErrorRate:0.1}} RetrySameNode:false})"
policyStr = fmt.Sprintf("WithBackupRequest(%+v)", *bp)
policyStr = fmt.Sprintf("WithBackupRequest(%+v)", bp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)
WithBackupRequest(bp)
}
Expand Down
16 changes: 10 additions & 6 deletions client/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ func (kc *kClient) invokeStreamingEndpoint() (endpoint.Endpoint, error) {
return func(ctx context.Context, req, resp interface{}) (err error) {
// req and resp as &streaming.Stream
ri := rpcinfo.GetRPCInfo(ctx)
st, err := remotecli.NewStream(ctx, ri, handler, kc.opt.RemoteOpt)
st, scm, err := remotecli.NewStream(ctx, ri, handler, kc.opt.RemoteOpt)
if err != nil {
return
}
clientStream := newStream(st, kc, ri, kc.getStreamingMode(ri), sendEndpoint, recvEndpoint)
clientStream := newStream(st, scm, kc, ri, kc.getStreamingMode(ri), sendEndpoint, recvEndpoint)
resp.(*streaming.Result).Stream = clientStream
return
}, nil
Expand All @@ -107,6 +107,7 @@ func (kc *kClient) getStreamingMode(ri rpcinfo.RPCInfo) serviceinfo.StreamingMod

type stream struct {
stream streaming.Stream
scm *remotecli.StreamConnManager
kc *kClient
ri rpcinfo.RPCInfo

Expand All @@ -118,11 +119,12 @@ type stream struct {

var _ streaming.WithDoFinish = (*stream)(nil)

func newStream(s streaming.Stream, kc *kClient, ri rpcinfo.RPCInfo,
func newStream(s streaming.Stream, scm *remotecli.StreamConnManager, kc *kClient, ri rpcinfo.RPCInfo,
mode serviceinfo.StreamingMode, sendEP endpoint.SendEndpoint, recvEP endpoint.RecvEndpoint,
) *stream {
return &stream{
stream: s,
scm: scm,
kc: kc,
ri: ri,
streamingMode: mode,
Expand Down Expand Up @@ -191,6 +193,7 @@ func (s *stream) Close() error {
}

// DoFinish implements the streaming.WithDoFinish interface, and it records the end of stream
// It will release the connection.
func (s *stream) DoFinish(err error) {
if atomic.SwapUint32(&s.finished, 1) == 1 {
// already called
Expand All @@ -200,9 +203,10 @@ func (s *stream) DoFinish(err error) {
// only rpc errors are reported
err = nil
}
ctx := s.Context()
ri := rpcinfo.GetRPCInfo(ctx)
s.kc.opt.TracerCtl.DoFinish(ctx, ri, err)
if s.scm != nil {
s.scm.ReleaseConn(err, s.ri)
}
s.kc.opt.TracerCtl.DoFinish(s.Context(), s.ri, err)
}

func isRPCError(err error) bool {
Expand Down
89 changes: 71 additions & 18 deletions client/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ func TestStreaming(t *testing.T) {
connpool := mock_remote.NewMockConnPool(ctrl)
connpool.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(conn, nil)
cliInfo.ConnPool = connpool
s, _ := remotecli.NewStream(ctx, mockRPCInfo, new(mocks.MockCliTransHandler), cliInfo)
s, cr, _ := remotecli.NewStream(ctx, mockRPCInfo, new(mocks.MockCliTransHandler), cliInfo)
stream := newStream(
s, kc, mockRPCInfo, serviceinfo.StreamingBidirectional,
s, cr, kc, mockRPCInfo, serviceinfo.StreamingBidirectional,
func(stream streaming.Stream, message interface{}) (err error) {
return stream.SendMsg(message)
},
Expand Down Expand Up @@ -211,9 +211,14 @@ func Test_newStream(t *testing.T) {
TracerCtl: &rpcinfo.TraceController{},
},
}

ctrl := gomock.NewController(t)
defer ctrl.Finish()
cr := mock_remote.NewMockConnReleaser(ctrl)
cr.EXPECT().ReleaseConn(gomock.Any(), gomock.Any()).Times(1)
scr := remotecli.NewStreamConnManager(cr)
s := newStream(
st,
scr,
kc,
ri,
serviceinfo.StreamingClient,
Expand Down Expand Up @@ -241,15 +246,20 @@ func (m *mockTracer) Finish(ctx context.Context) {
}

func Test_stream_Header(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

t.Run("no-error", func(t *testing.T) {
headers := metadata.MD{"k": []string{"v"}}
st := &mockStream{
header: func() (metadata.MD, error) {
return headers, nil
},
}

s := newStream(st, &kClient{}, nil, serviceinfo.StreamingBidirectional, nil, nil)
cr := mock_remote.NewMockConnReleaser(ctrl)
cr.EXPECT().ReleaseConn(gomock.Any(), gomock.Any()).Times(0)
scr := remotecli.NewStreamConnManager(cr)
s := newStream(st, scr, &kClient{}, nil, serviceinfo.StreamingBidirectional, nil, nil)
md, err := s.Header()

test.Assert(t, err == nil)
Expand Down Expand Up @@ -279,8 +289,10 @@ func Test_stream_Header(t *testing.T) {
TracerCtl: ctl,
},
}

s := newStream(st, kc, ri, serviceinfo.StreamingBidirectional, nil, nil)
cr := mock_remote.NewMockConnReleaser(ctrl)
cr.EXPECT().ReleaseConn(gomock.Any(), gomock.Any()).Times(1)
scr := remotecli.NewStreamConnManager(cr)
s := newStream(st, scr, kc, ri, serviceinfo.StreamingBidirectional, nil, nil)
md, err := s.Header()

test.Assert(t, err == headerErr)
Expand All @@ -290,9 +302,15 @@ func Test_stream_Header(t *testing.T) {
}

func Test_stream_RecvMsg(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

t.Run("no-error", func(t *testing.T) {
cr := mock_remote.NewMockConnReleaser(ctrl)
cr.EXPECT().ReleaseConn(gomock.Any(), gomock.Any()).Times(0)
scm := remotecli.NewStreamConnManager(cr)
mockRPCInfo := rpcinfo.NewRPCInfo(nil, nil, rpcinfo.NewInvocation("mock_service", "mock_method"), nil, nil)
s := newStream(&mockStream{}, &kClient{}, mockRPCInfo, serviceinfo.StreamingBidirectional, nil,
s := newStream(&mockStream{}, scm, &kClient{}, mockRPCInfo, serviceinfo.StreamingBidirectional, nil,
func(stream streaming.Stream, message interface{}) (err error) {
return nil
},
Expand Down Expand Up @@ -321,8 +339,11 @@ func Test_stream_RecvMsg(t *testing.T) {
TracerCtl: ctl,
},
}

s := newStream(st, kc, ri, serviceinfo.StreamingClient, nil,
cr := mock_remote.NewMockConnReleaser(ctrl)
// client streaming should release connection after RecvMsg
cr.EXPECT().ReleaseConn(gomock.Any(), gomock.Any()).Times(1)
scm := remotecli.NewStreamConnManager(cr)
s := newStream(st, scm, kc, ri, serviceinfo.StreamingClient, nil,
func(stream streaming.Stream, message interface{}) (err error) {
return nil
},
Expand Down Expand Up @@ -353,7 +374,10 @@ func Test_stream_RecvMsg(t *testing.T) {
},
}

s := newStream(st, kc, ri, serviceinfo.StreamingBidirectional, nil,
cr := mock_remote.NewMockConnReleaser(ctrl)
cr.EXPECT().ReleaseConn(gomock.Any(), gomock.Any()).Times(1)
scm := remotecli.NewStreamConnManager(cr)
s := newStream(st, scm, kc, ri, serviceinfo.StreamingBidirectional, nil,
func(stream streaming.Stream, message interface{}) (err error) {
return recvErr
},
Expand All @@ -366,8 +390,14 @@ func Test_stream_RecvMsg(t *testing.T) {
}

func Test_stream_SendMsg(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

t.Run("no-error", func(t *testing.T) {
s := newStream(&mockStream{}, &kClient{}, nil, serviceinfo.StreamingBidirectional,
cr := mock_remote.NewMockConnReleaser(ctrl)
cr.EXPECT().ReleaseConn(gomock.Any(), gomock.Any()).Times(0)
scm := remotecli.NewStreamConnManager(cr)
s := newStream(&mockStream{}, scm, &kClient{}, nil, serviceinfo.StreamingBidirectional,
func(stream streaming.Stream, message interface{}) (err error) {
return nil
},
Expand All @@ -380,6 +410,9 @@ func Test_stream_SendMsg(t *testing.T) {
})

t.Run("error", func(t *testing.T) {
cr := mock_remote.NewMockConnReleaser(ctrl)
cr.EXPECT().ReleaseConn(gomock.Any(), gomock.Any()).Times(1)
scm := remotecli.NewStreamConnManager(cr)
sendErr := errors.New("recv error")
ri := rpcinfo.NewRPCInfo(nil, nil, nil, nil, rpcinfo.NewRPCStats())
st := &mockStream{
Expand All @@ -399,7 +432,7 @@ func Test_stream_SendMsg(t *testing.T) {
},
}

s := newStream(st, kc, ri, serviceinfo.StreamingBidirectional,
s := newStream(st, scm, kc, ri, serviceinfo.StreamingBidirectional,
func(stream streaming.Stream, message interface{}) (err error) {
return sendErr
},
Expand All @@ -413,13 +446,18 @@ func Test_stream_SendMsg(t *testing.T) {
}

func Test_stream_Close(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
cr := mock_remote.NewMockConnReleaser(ctrl)
cr.EXPECT().ReleaseConn(gomock.Any(), gomock.Any()).Times(0)
scm := remotecli.NewStreamConnManager(cr)
called := false
s := newStream(&mockStream{
close: func() error {
called = true
return nil
},
}, &kClient{}, nil, serviceinfo.StreamingBidirectional, nil, nil)
}, scm, &kClient{}, nil, serviceinfo.StreamingBidirectional, nil, nil)

err := s.Close()

Expand All @@ -428,6 +466,9 @@ func Test_stream_Close(t *testing.T) {
}

func Test_stream_DoFinish(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

t.Run("no-error", func(t *testing.T) {
ri := rpcinfo.NewRPCInfo(nil, nil, nil, nil, rpcinfo.NewRPCStats())
st := &mockStream{
Expand All @@ -441,7 +482,10 @@ func Test_stream_DoFinish(t *testing.T) {
TracerCtl: ctl,
},
}
s := newStream(st, kc, ri, serviceinfo.StreamingBidirectional, nil, nil)
cr := mock_remote.NewMockConnReleaser(ctrl)
cr.EXPECT().ReleaseConn(gomock.Any(), gomock.Any()).Times(1)
scm := remotecli.NewStreamConnManager(cr)
s := newStream(st, scm, kc, ri, serviceinfo.StreamingBidirectional, nil, nil)

finishCalled := false
err := errors.New("any err")
Expand All @@ -468,7 +512,10 @@ func Test_stream_DoFinish(t *testing.T) {
TracerCtl: ctl,
},
}
s := newStream(st, kc, ri, serviceinfo.StreamingBidirectional, nil, nil)
cr := mock_remote.NewMockConnReleaser(ctrl)
cr.EXPECT().ReleaseConn(gomock.Any(), gomock.Any()).Times(1)
scm := remotecli.NewStreamConnManager(cr)
s := newStream(st, scm, kc, ri, serviceinfo.StreamingBidirectional, nil, nil)

finishCalled := false
err := errors.New("any err")
Expand All @@ -495,7 +542,10 @@ func Test_stream_DoFinish(t *testing.T) {
TracerCtl: ctl,
},
}
s := newStream(st, kc, ri, serviceinfo.StreamingBidirectional, nil, nil)
cr := mock_remote.NewMockConnReleaser(ctrl)
cr.EXPECT().ReleaseConn(gomock.Any(), gomock.Any()).Times(1)
scm := remotecli.NewStreamConnManager(cr)
s := newStream(st, scm, kc, ri, serviceinfo.StreamingBidirectional, nil, nil)

finishCalled := false
var err error
Expand All @@ -522,7 +572,10 @@ func Test_stream_DoFinish(t *testing.T) {
TracerCtl: ctl,
},
}
s := newStream(st, kc, ri, serviceinfo.StreamingBidirectional, nil, nil)
cr := mock_remote.NewMockConnReleaser(ctrl)
cr.EXPECT().ReleaseConn(gomock.Any(), gomock.Any()).Times(1)
scm := remotecli.NewStreamConnManager(cr)
s := newStream(st, scm, kc, ri, serviceinfo.StreamingBidirectional, nil, nil)

finishCalled := false
expectedErr := errors.New("error")
Expand Down
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ go 1.13

require (
github.com/apache/thrift v0.13.0
github.com/bytedance/gopkg v0.0.0-20230728082804-614d0af6619b
github.com/bytedance/gopkg v0.0.0-20240514070511-01b2cbcf35e1
github.com/bytedance/mockey v1.2.7
github.com/bytedance/sonic v1.11.2
github.com/choleraehyq/pid v0.0.18
github.com/cloudwego/configmanager v0.2.0
github.com/cloudwego/dynamicgo v0.2.0
github.com/bytedance/sonic v1.11.8
github.com/cloudwego/configmanager v0.2.2
github.com/cloudwego/dynamicgo v0.2.8
github.com/cloudwego/fastpb v0.0.4
github.com/cloudwego/frugal v0.1.14
github.com/cloudwego/frugal v0.1.15
github.com/cloudwego/localsession v0.0.2
github.com/cloudwego/netpoll v0.6.0
github.com/cloudwego/netpoll v0.6.1
github.com/cloudwego/runtimex v0.1.0
github.com/cloudwego/thriftgo v0.3.6
github.com/golang/mock v1.6.0
github.com/google/pprof v0.0.0-20220608213341-c488b8fa1db3
Expand Down
Loading

0 comments on commit 22991e2

Please sign in to comment.