diff --git a/protocol/find_coordinator_request.go b/protocol/find_coordinator_request.go new file mode 100644 index 00000000..53e5151d --- /dev/null +++ b/protocol/find_coordinator_request.go @@ -0,0 +1,34 @@ +package protocol + +type FindCoordinatorRequest struct { + CoordinatorKey string + CoordinatorType int8 +} + +func (r *FindCoordinatorRequest) Encode(e PacketEncoder) (err error) { + if err = e.PutString(r.CoordinatorKey); err != nil { + return err + } + if r.Version() >= 1 { + e.PutInt8(r.CoordinatorType) + } + return nil +} + +func (r *FindCoordinatorRequest) Decode(d PacketDecoder) (err error) { + if r.CoordinatorKey, err = d.String(); err != nil { + return err + } + if r.CoordinatorType, err = d.Int8(); err != nil { + return err + } + return nil +} + +func (r *FindCoordinatorRequest) Version() int16 { + return 1 +} + +func (r *FindCoordinatorRequest) Key() int16 { + return 10 +} diff --git a/protocol/find_coordinator_request_test.go b/protocol/find_coordinator_request_test.go new file mode 100644 index 00000000..f922f812 --- /dev/null +++ b/protocol/find_coordinator_request_test.go @@ -0,0 +1,22 @@ +package protocol + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestFindCoordinatorRequest(t *testing.T) { + req := require.New(t) + exp := &FindCoordinatorRequest{ + CoordinatorKey: "coord-key", + CoordinatorType: 1, + } + b, err := Encode(exp) + req.NoError(err) + var act FindCoordinatorRequest + err = Decode(b, &act) + req.NoError(err) + req.Equal(exp, &act) + +} diff --git a/protocol/find_coordinator_response.go b/protocol/find_coordinator_response.go new file mode 100644 index 00000000..5868c6d9 --- /dev/null +++ b/protocol/find_coordinator_response.go @@ -0,0 +1,50 @@ +package protocol + +type Coordinator struct { + NodeID int32 + Host string + Port int32 +} + +type FindCoordinatorResponse struct { + ThrottleTimeMs int32 + ErrorCode int16 + ErrorMessage *string + Coordinator Coordinator +} + +func (r *FindCoordinatorResponse) Encode(e PacketEncoder) (err error) { + e.PutInt32(r.ThrottleTimeMs) + e.PutInt16(r.ErrorCode) + if err = e.PutNullableString(r.ErrorMessage); err != nil { + return err + } + e.PutInt32(r.Coordinator.NodeID) + if err = e.PutString(r.Coordinator.Host); err != nil { + return err + } + e.PutInt32(r.Coordinator.Port) + return nil +} + +func (r *FindCoordinatorResponse) Decode(d PacketDecoder) (err error) { + if r.ThrottleTimeMs, err = d.Int32(); err != nil { + return err + } + if r.ErrorCode, err = d.Int16(); err != nil { + return err + } + if r.ErrorMessage, err = d.NullableString(); err != nil { + return err + } + if r.Coordinator.NodeID, err = d.Int32(); err != nil { + return err + } + if r.Coordinator.Host, err = d.String(); err != nil { + return err + } + if r.Coordinator.Port, err = d.Int32(); err != nil { + return err + } + return nil +} diff --git a/protocol/find_coordinator_response_test.go b/protocol/find_coordinator_response_test.go new file mode 100644 index 00000000..74578377 --- /dev/null +++ b/protocol/find_coordinator_response_test.go @@ -0,0 +1,28 @@ +package protocol + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestFindCoordinatorResponse(t *testing.T) { + req := require.New(t) + errMsg := "Shit's broken" + exp := &FindCoordinatorResponse{ + ThrottleTimeMs: 1, + ErrorCode: 2, + ErrorMessage: &errMsg, + Coordinator: Coordinator{ + NodeID: 3, + Host: "localhost", + Port: 4, + }, + } + b, err := Encode(exp) + req.NoError(err) + var act FindCoordinatorResponse + err = Decode(b, &act) + req.NoError(err) + req.Equal(exp, &act) +} diff --git a/protocol/group_coordinator_response.go b/protocol/group_coordinator_response.go index 9e50d134..642c89f2 100644 --- a/protocol/group_coordinator_response.go +++ b/protocol/group_coordinator_response.go @@ -1,11 +1,5 @@ package protocol -type Coordinator struct { - NodeID int32 - Host string - Port int32 -} - type GroupCoordinatorResponse struct { ErrorCode int16 Coordinator *Coordinator