diff --git a/client.go b/client.go index e59fb0b..3db0cc7 100644 --- a/client.go +++ b/client.go @@ -1,8 +1,13 @@ package turnpike import ( + "encoding/json" "fmt" + "reflect" + "strings" "time" + "unicode" + "unicode/utf8" ) var ( @@ -36,9 +41,16 @@ type Client struct { listeners map[ID]chan Message events map[ID]*eventDesc procedures map[ID]*procedureDesc + serviceMap map[string]*service requestCount uint } +type service struct { + name string + procedures map[string]reflect.Method + topics map[string]reflect.Method +} + type procedureDesc struct { name string handler MethodHandler @@ -510,3 +522,237 @@ func (c *Client) Call(procedure string, args []interface{}, kwargs map[string]in return result, nil } } + +// RegisterService registers in the dealer the set of methods of the +// receiver value that satisfy the following conditions: +// - exported method of exported type +// - at least one return value, of type error +// It returns an error if the receiver is not an exported type or has +// no suitable methods. +// The client accesses each method using a string of the form "type.method", +// where type is the receiver's concrete type. +func (c *Client) RegisterService(rcvr interface{}) error { + return c.registerService(rcvr, "", false) +} + +// RegisterServiceName is like RegisterService but uses the provided name for the type +// instead of the receiver's concrete type. +func (c *Client) RegisterServiceName(name string, rcvr interface{}) error { + return c.registerService(rcvr, name, true) +} + +var typeOfError = reflect.TypeOf((*error)(nil)).Elem() + +// Is this an exported - upper case - name? +func isExported(name string) bool { + rune, _ := utf8.DecodeRuneInString(name) + return unicode.IsUpper(rune) +} + +func (c *Client) registerService(rcvr interface{}, name string, useName bool) error { + if c.serviceMap == nil { + c.serviceMap = make(map[string]*service) + } + typ := reflect.TypeOf(rcvr) + val := reflect.ValueOf(rcvr) + sname := reflect.Indirect(val).Type().Name() + if name != "" { + sname = name + } + if !isExported(sname) && !useName { + return fmt.Errorf("type %s is not exported", sname) + } + if _, present := c.serviceMap[sname]; present { + return fmt.Errorf("service %s is already defined", sname) + } + + procedures := make(map[string]reflect.Method) + topics := make(map[string]reflect.Method) + for m := 0; m < typ.NumMethod(); m++ { + method := typ.Method(m) + mtype := method.Type + mname := method.Name + // Method must be exported. + if method.PkgPath != "" { + continue + } + + // Method needs at least one out to be a procedure + numOut := mtype.NumOut() + if numOut == 0 { + // A method with zero outs must start with On to be a event listener + if strings.HasPrefix(mname, "On") { + topics[mname] = method + } + continue + } + + // Method last out must be error + if errType := mtype.Out(numOut - 1); errType != typeOfError { + continue + } + + procedures[mname] = method + } + + // Register methods as procedures with Dealer + for mname, value := range procedures { + procedure := value + namespace := sname + "." + mname + f := func(args []interface{}, kwargs map[string]interface{}, details map[string]interface{}) (callResult *CallResult) { + methodArgs := procedure.Type.NumIn() - 1 + if methodArgs != len(args) { + err := fmt.Errorf("method %s has %d inputs, was called with %d arguments as %s procedure ", procedure.Name, methodArgs, len(args), namespace) + return &CallResult{ + Args: []interface{}{err.Error()}, + Err: ErrInvalidArgument, + } + } + values := make([]reflect.Value, len(args)) + var err error + for i, arg := range args { + in := procedure.Type.In(i + 1) + values[i], err = decodeArgument(in, arg) + if err != nil { + return &CallResult{ + Args: []interface{}{err.Error()}, + Err: ErrInvalidArgument, + } + } + } + values = append([]reflect.Value{val}, values...) + function := procedure.Func + returnValues := function.Call(values) + + result := make([]interface{}, len(returnValues)) + for i := range returnValues { + result[i] = returnValues[i].Interface() + } + return &CallResult{ + Args: result, + } + } + if err := c.Register(namespace, f, map[string]interface{}{}); err != nil { + return err + } + } + + // Subscribe methods to topics with the broker + for mname, value := range topics { + topic := value + namespace := sname + "." + mname + f := func(args []interface{}, kwargs map[string]interface{}) { + methodArgs := topic.Type.NumIn() - 1 + if methodArgs != len(args) { + log.Printf("event %s has %d inputs, was published with %d arguments", namespace, methodArgs, len(args)) + return + } + values := make([]reflect.Value, len(args)) + var err error + for i, arg := range args { + in := topic.Type.In(i + 1) + values[i], err = decodeArgument(in, arg) + if err != nil { + log.Println(err) + return + } + } + values = append([]reflect.Value{val}, values...) + topic.Func.Call(values) + } + if err := c.Subscribe(namespace, f); err != nil { + return err + } + } + c.serviceMap[sname] = &service{ + procedures: procedures, + topics: topics, + name: sname, + } + return nil +} + +func decodeArgument(target reflect.Type, arg interface{}) (reflect.Value, error) { + if arg == nil { + return reflect.Zero(target), nil + } + val := reflect.New(target) + b, err := json.Marshal(arg) + if err != nil { + return val, err + } + err = json.Unmarshal(b, val.Interface()) + v := reflect.Indirect(val) + return v, err +} + +// UnregisterService unsubscribes the service event listeners and unregisters the service procedures. +func (c *Client) UnregisterService(name string) error { + service, present := c.serviceMap[name] + if !present { + return fmt.Errorf("service %s is not defined", name) + } + errs := newErrCol() + for procedureName := range service.procedures { + err := c.Unregister(service.name + "." + procedureName) + errs.IfErrAppend(err) + } + for topicName := range service.topics { + err := c.Unsubscribe(service.name + "." + topicName) + errs.IfErrAppend(err) + } + c.serviceMap[name] = nil + if len(errs) == 0 { + return nil + } + return errs +} + +// CallService invokes the service method 'namespace' with arguments 'args'. +// The return values are saved in 'replies'. +func (c *Client) CallService(namespace string, args []interface{}, replies ...interface{}) error { + for i := 0; i < len(replies); i++ { + if replies[i] == nil { + continue + } + kind := reflect.ValueOf(replies[i]).Kind() + if kind != reflect.Ptr { + return fmt.Errorf("reply[%d] type %s is not a pointer", i, kind) + } + } + + res, err := c.Call(namespace, args, nil) + if err != nil { + return err + } + returnValues := res.Arguments + if len(returnValues)-1 < len(replies) { + return fmt.Errorf("expected %d return values, got %d", len(replies), len(returnValues)-1) + } + if len(returnValues) == 0 { + return fmt.Errorf("expected at least one return value of type string, nil or error") + } + + for i := 0; i < len(replies); i++ { + if replies[i] == nil { + continue + } + vReplies := reflect.ValueOf(replies[i]) + tReplies := vReplies.Type() + val, err := decodeArgument(tReplies, returnValues[i]) + if err != nil { + return err + } + vReplies.Elem().Set(reflect.Indirect(val)) + } + switch e := returnValues[len(returnValues)-1].(type) { + case string: + return fmt.Errorf("%s", e) + case nil: + return nil + case error: + return e + default: + return fmt.Errorf("expected the last return value to be of type string, nil or error got %T", e) + } +} diff --git a/client_test.go b/client_test.go index 2905aaa..8266273 100644 --- a/client_test.go +++ b/client_test.go @@ -1,6 +1,8 @@ package turnpike import ( + "fmt" + "sync" "testing" "time" @@ -112,5 +114,218 @@ func TestRemoteCall(t *testing.T) { }) }) }) + + // RegisterService registers in the dealer the set of methods of the + // receiver value that satisfy the following conditions: + // - exported method of exported type + // - two arguments, both of exported type + // - at least one return value, of type error + // It returns an error if the receiver is not an exported type or has + // no suitable methods. + // The client accesses each method using a string of the form "type.method", + // where type is the receiver's concrete type. + Convey("The callee registers an invalid service", func() { + Convey("the type is not a struct", func() { + s := "invalid service" + err := callee.RegisterService(s) + Convey("Should result in an error", func() { + So(err, ShouldNotBeNil) + }) + }) + Convey("the type is not exported", func() { + s := &struct{}{} + err := callee.RegisterService(s) + Convey("Should result in an error", func() { + So(err, ShouldNotBeNil) + }) + }) + Convey("no method is exported", func() { + s := &noMethodExportedService{} + err := callee.RegisterService(s) + Convey("Should result in an error", func() { + So(err, ShouldNotBeNil) + }) + }) + Convey("exported method has no return value", func() { + s := &noReturnValueService{} + err := callee.RegisterService(s) + Convey("Should result in an error", func() { + So(err, ShouldNotBeNil) + }) + }) + Convey("exported method has no return value of type error", func() { + s := &noReturnValueOfTypeErrorService{} + err := callee.RegisterService(s) + Convey("Should result in an error", func() { + So(err, ShouldNotBeNil) + }) + }) + }) + + Convey("The callee registers a valid service", func() { + s := &ValidService{name: "ValidService"} + err := callee.RegisterService(s) + Convey("and expects no error", func() { + So(err, ShouldBeNil) + + Convey("The caller calls the Ping method of the service", func() { + var message string + err := caller.CallService("ValidService.Ping", nil, &message) + Convey("and expects no error", func() { + So(err, ShouldBeNil) + }) + Convey("and expects the message to be 'pong'", func() { + So(message, ShouldEqual, "pong") + }) + }) + + Convey("The caller calls the Echo method of the service", func() { + var message string + err := caller.CallService("ValidService.Echo", []interface{}{"echo"}, &message) + Convey("and expects no error", func() { + So(err, ShouldBeNil) + }) + Convey("and expects the message to be 'echo'", func() { + So(message, ShouldEqual, "echo") + }) + }) + + Convey("The caller calls the Info method of the service", func() { + var info *ServiceInfo + err := caller.CallService("ValidService.Info", nil, &info) + Convey("and expects no error", func() { + So(err, ShouldBeNil) + }) + Convey("and expects the info to be &ServiceInfo{ServiceName:'ValidService'}", func() { + So(info, ShouldResemble, &ServiceInfo{ServiceName: "ValidService"}) + }) + }) + + Convey("The caller calls the SetInfo method of the service", func() { + info := &ServiceInfo{ + ServiceName: "NewServiceName", + } + s.m.Lock() + err := caller.CallService("ValidService.SetInfo", []interface{}{info}) + Convey("and expects no error", func() { + So(err, ShouldBeNil) + }) + Convey("and expects the value of s.name to be 'NewServiceName'", func() { + s.m.Lock() + So(s.name, ShouldEqual, "NewServiceName") + s.m.Unlock() + }) + }) + + Convey("The caller calls the Error method of the service", func() { + err := caller.CallService("ValidService.Error", nil) + Convey("and expects an error", func() { + So(err, ShouldNotBeNil) + So(err.Error(), ShouldEqual, "Error") + }) + }) + + Convey("The caller calls the Simple method of the service", func() { + var st SimpleType + err := caller.CallService("ValidService.SimpleType", nil, &st) + Convey("and expects an error", func() { + So(err, ShouldBeNil) + So(st, ShouldEqual, SimpleType("simple")) + }) + }) + + Convey("The caller publishes an event", func() { + info := &ServiceInfo{ + ServiceName: "EventChangedName", + } + s.m.Lock() + err := caller.Publish("ValidService.OnNewInfo", []interface{}{info}, map[string]interface{}{}) + Convey("and expects no error", func() { + So(err, ShouldBeNil) + }) + Convey("and expects the value of s.name to be 'EventChangedName'", func() { + s.m.Lock() + So(s.name, ShouldEqual, "EventChangedName") + s.m.Unlock() + }) + }) + }) + + Convey("The callee unregisters the service", func() { + err := callee.UnregisterService("ValidService") + Convey("and expects no error", func() { + So(err, ShouldBeNil) + Convey("The caller calls the Error method of the service", func() { + err := caller.CallService("ValidService.Error", nil) + Convey("and expects an error", func() { + So(err, ShouldNotBeNil) + }) + }) + }) + }) + }) + + Convey("The callee unregisters a undefined service", func() { + err := callee.Unregister("ServiceIsNotDefined") + Convey("and expects an error", func() { + So(err, ShouldNotBeNil) + }) + }) }) } + +type noMethodExportedService struct{} + +func (s *noMethodExportedService) notExported() error { return nil } + +type noReturnValueService struct{} + +func (s *noReturnValueService) NoReturnValue() {} + +type noReturnValueOfTypeErrorService struct{} + +func (s *noReturnValueOfTypeErrorService) NoReturnValue() string { return "" } + +type SimpleType string + +type ValidService struct { + name string + m sync.Mutex +} + +func (s *ValidService) Ping() (string, error) { + return "pong", nil +} + +func (s *ValidService) Echo(message string) (string, error) { + return message, nil +} + +func (s *ValidService) Info() (*ServiceInfo, error) { + return &ServiceInfo{ + ServiceName: s.name, + }, nil +} + +func (s *ValidService) OnNewInfo(info *ServiceInfo) { + s.name = info.ServiceName + s.m.Unlock() +} + +func (s *ValidService) SetInfo(info *ServiceInfo) error { + s.name = info.ServiceName + s.m.Unlock() + return nil +} + +func (s *ValidService) SimpleType() (SimpleType, error) { + return SimpleType("simple"), nil +} + +func (s *ValidService) Error() error { + return fmt.Errorf("%s", "Error") +} + +type ServiceInfo struct { + ServiceName string +} diff --git a/util.go b/util.go index cd067c6..c99208b 100644 --- a/util.go +++ b/util.go @@ -1,6 +1,8 @@ package turnpike import ( + "bytes" + "fmt" "math/rand" "time" ) @@ -80,3 +82,38 @@ func init() { func NewID() ID { return ID(rand.Int63n(maxID)) } + +func newErrCol() errCol { + return make(errCol, 0) +} + +type errCol []error + +func (e *errCol) IfErrAppend(err error) bool { + if err != nil { + *e = append(*e, err) + return true + } + return false +} + +func (e errCol) Error() string { + if len(e) == 0 { + return "" + } + var buf bytes.Buffer + if len(e) == 1 { + buf.WriteString("1 error: ") + } else { + fmt.Fprintf(&buf, "%d errors: ", len(e)) + } + + for i, err := range e { + if i != 0 { + buf.WriteString("; ") + } + buf.WriteString(err.Error()) + } + + return buf.String() +}