Skip to content

Commit

Permalink
Merge branch 'develop' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
jizhuozhi authored Jun 18, 2024
2 parents c048edc + a45de35 commit d0afde6
Show file tree
Hide file tree
Showing 40 changed files with 821 additions and 952 deletions.
14 changes: 9 additions & 5 deletions client/genericclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var _ Client = &genericServiceClient{}

// NewClient create a generic client
func NewClient(destService string, g generic.Generic, opts ...client.Option) (Client, error) {
svcInfo := generic.ServiceInfo(g.PayloadCodecType())
svcInfo := generic.ServiceInfoWithCodec(g)
return NewClientWithServiceInfo(destService, g, svcInfo, opts...)
}

Expand All @@ -47,6 +47,7 @@ func NewClientWithServiceInfo(destService string, g generic.Generic, svcInfo *se
return nil, err
}
cli := &genericServiceClient{
svcInfo: svcInfo,
kClient: kc,
g: g,
}
Expand Down Expand Up @@ -86,24 +87,27 @@ type Client interface {
}

type genericServiceClient struct {
svcInfo *serviceinfo.ServiceInfo
kClient client.Client
g generic.Generic
}

func (gc *genericServiceClient) GenericCall(ctx context.Context, method string, request interface{}, callOptions ...callopt.Option) (response interface{}, err error) {
ctx = client.NewCtxWithCallOptions(ctx, callOptions)
var _args generic.Args
_args := gc.svcInfo.MethodInfo(method).NewArgs().(*generic.Args)
_args.Method = method
_args.Request = request

mt, err := gc.g.GetMethod(request, method)
if err != nil {
return nil, err
}
if mt.Oneway {
return nil, gc.kClient.Call(ctx, mt.Name, &_args, nil)
return nil, gc.kClient.Call(ctx, mt.Name, _args, nil)
}
var _result generic.Result
if err = gc.kClient.Call(ctx, mt.Name, &_args, &_result); err != nil {

_result := gc.svcInfo.MethodInfo(method).NewResult().(*generic.Result)
if err = gc.kClient.Call(ctx, mt.Name, _args, _result); err != nil {
return
}
return _result.GetSuccess(), nil
Expand Down
7 changes: 4 additions & 3 deletions client/rpctimeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,20 @@ func makeTimeoutErr(ctx context.Context, start time.Time, timeout time.Duration)
return kerrors.ErrRPCTimeout.WithCause(fmt.Errorf("%s: %w by business", errMsg, ctx.Err()))
}
}

if ddl, ok := ctx.Deadline(); !ok {
errMsg = fmt.Sprintf("%s, %s", errMsg, "unknown error: context deadline not set?")
} else {
// Go's timer implementation is not so accurate,
// so if we need to check ctx deadline earlier than our timeout, we should consider the accuracy
if timeout <= 0 {
isBizTimeout := isBusinessTimeout(start, timeout, ddl, rpctimeout.LoadBusinessTimeoutThreshold())
if isBizTimeout {
// if timeout set in context is shorter than RPCTimeout in rpcinfo, it will trigger earlier.
errMsg = fmt.Sprintf("%s, timeout by business, actual=%s", errMsg, ddl.Sub(start))
} else if roundTimeout := timeout - time.Millisecond; roundTimeout >= 0 && ddl.Before(start.Add(roundTimeout)) {
errMsg = fmt.Sprintf("%s, context deadline earlier than timeout, actual=%v", errMsg, ddl.Sub(start))
}

if needFineGrainedErrCode && isBusinessTimeout(start, timeout, ddl, rpctimeout.LoadBusinessTimeoutThreshold()) {
if needFineGrainedErrCode && isBizTimeout {
return kerrors.ErrTimeoutByBusiness.WithCause(errors.New(errMsg))
}
}
Expand Down
11 changes: 11 additions & 0 deletions client/rpctimeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,17 @@ func TestRpcTimeoutMWTimeoutByBusiness(t *testing.T) {
}
rpctimeout.DisableGlobalNeedFineGrainedErrCode()
})

t.Run("Timeout by business with shorter timeout than RPCTimeout in rpcinfo", func(t *testing.T) {
rpctimeout.DisableGlobalNeedFineGrainedErrCode()
err := runTimeoutMW(time.Second)
if !kerrors.IsTimeoutError(err) {
t.Errorf("rpcTimeoutMW(1s) = %v, want %v", err, kerrors.ErrRPCTimeout)
}
if errMsg := err.Error(); !strings.Contains(errMsg, "timeout by business, actual") {
t.Errorf("rpcTimeoutMW(1s) = %v, want error msg with %v", errMsg, "timeout by business, actual")
}
})
}

func TestRpcTimeoutMWCancelByBusiness(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions internal/mocks/generic/thrift.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

103 changes: 66 additions & 37 deletions pkg/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,19 @@ import (
type Generic interface {
Closer
// PayloadCodec return codec implement
// this is used for generic which does not need IDL
PayloadCodec() remote.PayloadCodec
// PayloadCodecType return the type of codec
PayloadCodecType() serviceinfo.PayloadCodec
// RawThriftBinaryGeneric must be framed
Framed() bool
// GetMethod to get method name if need
// GetMethod is to get method name if needed
GetMethod(req interface{}, method string) (*Method, error)
// IDLServiceName returns idl service name
IDLServiceName() string
// MessageReaderWriter returns reader and writer
// this is used for generic which needs IDL
MessageReaderWriter() interface{}
}

// Method information
Expand All @@ -64,22 +70,14 @@ func BinaryThriftGeneric() Generic {
//
// SetBinaryWithByteSlice(g, true)
func MapThriftGeneric(p DescriptorProvider) (Generic, error) {
codec, err := newMapThriftCodec(p, thriftCodec)
if err != nil {
return nil, err
}
return &mapThriftGeneric{
codec: codec,
codec: newMapThriftCodec(p),
}, nil
}

func MapThriftGenericForJSON(p DescriptorProvider) (Generic, error) {
codec, err := newMapThriftCodecForJSON(p, thriftCodec)
if err != nil {
return nil, err
}
return &mapThriftGeneric{
codec: codec,
codec: newMapThriftCodecForJSON(p),
}, nil
}

Expand All @@ -92,20 +90,12 @@ func MapThriftGenericForJSON(p DescriptorProvider) (Generic, error) {
func HTTPThriftGeneric(p DescriptorProvider, opts ...Option) (Generic, error) {
gOpts := &Options{dynamicgoConvOpts: DefaultHTTPDynamicGoConvOpts}
gOpts.apply(opts)
codec, err := newHTTPThriftCodec(p, thriftCodec, gOpts)
if err != nil {
return nil, err
}
return &httpThriftGeneric{codec: codec}, nil
return &httpThriftGeneric{codec: newHTTPThriftCodec(p, gOpts)}, nil
}

func HTTPPbThriftGeneric(p DescriptorProvider, pbp PbDescriptorProvider) (Generic, error) {
codec, err := newHTTPPbThriftCodec(p, pbp, thriftCodec)
if err != nil {
return nil, err
}
return &httpPbThriftGeneric{
codec: codec,
codec: newHTTPPbThriftCodec(p, pbp),
}, nil
}

Expand All @@ -118,24 +108,15 @@ func HTTPPbThriftGeneric(p DescriptorProvider, pbp PbDescriptorProvider) (Generi
func JSONThriftGeneric(p DescriptorProvider, opts ...Option) (Generic, error) {
gOpts := &Options{dynamicgoConvOpts: DefaultJSONDynamicGoConvOpts}
gOpts.apply(opts)
codec, err := newJsonThriftCodec(p, thriftCodec, gOpts)
if err != nil {
return nil, err
}
return &jsonThriftGeneric{codec: codec}, nil
return &jsonThriftGeneric{codec: newJsonThriftCodec(p, gOpts)}, nil
}

// JSONPbGeneric json mapping generic.
// Uses dynamicgo for json to protobufs conversion, so DynamicGo field is true by default.
func JSONPbGeneric(p PbDescriptorProviderDynamicGo, opts ...Option) (Generic, error) {
gOpts := &Options{dynamicgoConvOpts: conv.Options{}}
gOpts.apply(opts)

codec, err := newJsonPbCodec(p, pbCodec, gOpts)
if err != nil {
return nil, err
}
return &jsonPbGeneric{codec: codec}, nil
return &jsonPbGeneric{codec: newJsonPbCodec(p, gOpts)}, nil
}

// SetBinaryWithBase64 enable/disable Base64 codec for binary field.
Expand Down Expand Up @@ -243,6 +224,14 @@ func (g *binaryThriftGeneric) Close() error {
return nil
}

func (g *binaryThriftGeneric) IDLServiceName() string {
return ""
}

func (g *binaryThriftGeneric) MessageReaderWriter() interface{} {
return nil
}

type mapThriftGeneric struct {
codec *mapThriftCodec
}
Expand All @@ -256,7 +245,7 @@ func (g *mapThriftGeneric) PayloadCodecType() serviceinfo.PayloadCodec {
}

func (g *mapThriftGeneric) PayloadCodec() remote.PayloadCodec {
return g.codec
return nil
}

func (g *mapThriftGeneric) GetMethod(req interface{}, method string) (*Method, error) {
Expand All @@ -267,6 +256,14 @@ func (g *mapThriftGeneric) Close() error {
return g.codec.Close()
}

func (g *mapThriftGeneric) IDLServiceName() string {
return g.codec.svcName
}

func (g *mapThriftGeneric) MessageReaderWriter() interface{} {
return g.codec.getMessageReaderWriter()
}

type jsonThriftGeneric struct {
codec *jsonThriftCodec
}
Expand All @@ -280,7 +277,7 @@ func (g *jsonThriftGeneric) PayloadCodecType() serviceinfo.PayloadCodec {
}

func (g *jsonThriftGeneric) PayloadCodec() remote.PayloadCodec {
return g.codec
return nil
}

func (g *jsonThriftGeneric) GetMethod(req interface{}, method string) (*Method, error) {
Expand All @@ -291,6 +288,14 @@ func (g *jsonThriftGeneric) Close() error {
return g.codec.Close()
}

func (g *jsonThriftGeneric) IDLServiceName() string {
return g.codec.svcName
}

func (g *jsonThriftGeneric) MessageReaderWriter() interface{} {
return g.codec.getMessageReaderWriter()
}

type jsonPbGeneric struct {
codec *jsonPbCodec
}
Expand All @@ -304,7 +309,7 @@ func (g *jsonPbGeneric) PayloadCodecType() serviceinfo.PayloadCodec {
}

func (g *jsonPbGeneric) PayloadCodec() remote.PayloadCodec {
return g.codec
return nil
}

func (g *jsonPbGeneric) GetMethod(req interface{}, method string) (*Method, error) {
Expand All @@ -315,6 +320,14 @@ func (g *jsonPbGeneric) Close() error {
return g.codec.Close()
}

func (g *jsonPbGeneric) IDLServiceName() string {
return g.codec.svcName
}

func (g *jsonPbGeneric) MessageReaderWriter() interface{} {
return g.codec.getMessageReaderWriter()
}

type httpThriftGeneric struct {
codec *httpThriftCodec
}
Expand All @@ -328,7 +341,7 @@ func (g *httpThriftGeneric) PayloadCodecType() serviceinfo.PayloadCodec {
}

func (g *httpThriftGeneric) PayloadCodec() remote.PayloadCodec {
return g.codec
return nil
}

func (g *httpThriftGeneric) GetMethod(req interface{}, method string) (*Method, error) {
Expand All @@ -339,6 +352,14 @@ func (g *httpThriftGeneric) Close() error {
return g.codec.Close()
}

func (g *httpThriftGeneric) IDLServiceName() string {
return g.codec.svcName
}

func (g *httpThriftGeneric) MessageReaderWriter() interface{} {
return g.codec.getMessageReaderWriter()
}

type httpPbThriftGeneric struct {
codec *httpPbThriftCodec
}
Expand All @@ -352,7 +373,7 @@ func (g *httpPbThriftGeneric) PayloadCodecType() serviceinfo.PayloadCodec {
}

func (g *httpPbThriftGeneric) PayloadCodec() remote.PayloadCodec {
return g.codec
return nil
}

func (g *httpPbThriftGeneric) GetMethod(req interface{}, method string) (*Method, error) {
Expand All @@ -362,3 +383,11 @@ func (g *httpPbThriftGeneric) GetMethod(req interface{}, method string) (*Method
func (g *httpPbThriftGeneric) Close() error {
return g.codec.Close()
}

func (g *httpPbThriftGeneric) IDLServiceName() string {
return g.codec.svcName
}

func (g *httpPbThriftGeneric) MessageReaderWriter() interface{} {
return g.codec.getMessageReaderWriter()
}
Loading

0 comments on commit d0afde6

Please sign in to comment.