diff --git a/jocko/broker.go b/jocko/broker.go index bc310661..5c239b2e 100644 --- a/jocko/broker.go +++ b/jocko/broker.go @@ -141,7 +141,7 @@ func (b *Broker) Run(ctx context.Context, requests <-chan *Context, responses ch queueSpan.Finish() } - switch req := reqCtx.Request.(type) { + switch req := reqCtx.req.(type) { case *protocol.ProduceRequest: response = b.handleProduce(reqCtx, req) case *protocol.FetchRequest: @@ -195,11 +195,11 @@ func (b *Broker) Run(ctx context.Context, requests <-chan *Context, responses ch responseCtx := context.WithValue(reqCtx, responseQueueSpanKey, queueSpan) responses <- &Context{ - Parent: responseCtx, - Conn: reqCtx.Conn, - Header: reqCtx.Header, - Response: &protocol.Response{ - CorrelationID: reqCtx.Header.CorrelationID, + parent: responseCtx, + conn: reqCtx.conn, + header: reqCtx.header, + res: &protocol.Response{ + CorrelationID: reqCtx.header.CorrelationID, Body: response, }, } @@ -215,7 +215,7 @@ func (b *Broker) JoinLAN(addrs ...string) protocol.Error { return protocol.ErrNone } -// Request handling. +// req handling. func span(ctx context.Context, tracer opentracing.Tracer, op string) opentracing.Span { if ctx == nil { diff --git a/jocko/broker_test.go b/jocko/broker_test.go index 32b8909a..6da8454d 100644 --- a/jocko/broker_test.go +++ b/jocko/broker_test.go @@ -51,12 +51,12 @@ func TestBroker_Run(t *testing.T) { requestCh: make(chan *Context, 2), responseCh: make(chan *Context, 2), requests: []*Context{{ - Header: &protocol.RequestHeader{CorrelationID: 1}, - Request: &protocol.APIVersionsRequest{}, + header: &protocol.RequestHeader{CorrelationID: 1}, + req: &protocol.APIVersionsRequest{}, }}, responses: []*Context{{ - Header: &protocol.RequestHeader{CorrelationID: 1}, - Response: &protocol.Response{CorrelationID: 1, Body: apiVersions}, + header: &protocol.RequestHeader{CorrelationID: 1}, + res: &protocol.Response{CorrelationID: 1, Body: apiVersions}, }}, }, }, @@ -66,16 +66,16 @@ func TestBroker_Run(t *testing.T) { requestCh: make(chan *Context, 2), responseCh: make(chan *Context, 2), requests: []*Context{{ - Header: &protocol.RequestHeader{CorrelationID: 1}, - Request: &protocol.CreateTopicRequests{Requests: []*protocol.CreateTopicRequest{{ + header: &protocol.RequestHeader{CorrelationID: 1}, + req: &protocol.CreateTopicRequests{Requests: []*protocol.CreateTopicRequest{{ Topic: "the-topic", NumPartitions: 1, ReplicationFactor: 1, }}}}, }, responses: []*Context{{ - Header: &protocol.RequestHeader{CorrelationID: 1}, - Response: &protocol.Response{CorrelationID: 1, Body: &protocol.CreateTopicsResponse{ + header: &protocol.RequestHeader{CorrelationID: 1}, + res: &protocol.Response{CorrelationID: 1, Body: &protocol.CreateTopicsResponse{ TopicErrorCodes: []*protocol.TopicErrorCode{{Topic: "the-topic", ErrorCode: protocol.ErrNone.Code()}}, }}, }}, @@ -87,16 +87,16 @@ func TestBroker_Run(t *testing.T) { requestCh: make(chan *Context, 2), responseCh: make(chan *Context, 2), requests: []*Context{{ - Header: &protocol.RequestHeader{CorrelationID: 1}, - Request: &protocol.CreateTopicRequests{Requests: []*protocol.CreateTopicRequest{{ + header: &protocol.RequestHeader{CorrelationID: 1}, + req: &protocol.CreateTopicRequests{Requests: []*protocol.CreateTopicRequest{{ Topic: "the-topic", NumPartitions: 1, ReplicationFactor: 2, }}}}, }, responses: []*Context{{ - Header: &protocol.RequestHeader{CorrelationID: 1}, - Response: &protocol.Response{CorrelationID: 1, Body: &protocol.CreateTopicsResponse{ + header: &protocol.RequestHeader{CorrelationID: 1}, + res: &protocol.Response{CorrelationID: 1, Body: &protocol.CreateTopicsResponse{ TopicErrorCodes: []*protocol.TopicErrorCode{{Topic: "the-topic", ErrorCode: protocol.ErrInvalidReplicationFactor.Code()}}, }}, }}, @@ -108,23 +108,23 @@ func TestBroker_Run(t *testing.T) { requestCh: make(chan *Context, 2), responseCh: make(chan *Context, 2), requests: []*Context{{ - Header: &protocol.RequestHeader{CorrelationID: 1}, - Request: &protocol.CreateTopicRequests{Requests: []*protocol.CreateTopicRequest{{ + header: &protocol.RequestHeader{CorrelationID: 1}, + req: &protocol.CreateTopicRequests{Requests: []*protocol.CreateTopicRequest{{ Topic: "the-topic", NumPartitions: 1, ReplicationFactor: 1, }}}}, { - Header: &protocol.RequestHeader{CorrelationID: 2}, - Request: &protocol.DeleteTopicsRequest{Topics: []string{"the-topic"}}}, + header: &protocol.RequestHeader{CorrelationID: 2}, + req: &protocol.DeleteTopicsRequest{Topics: []string{"the-topic"}}}, }, responses: []*Context{{ - Header: &protocol.RequestHeader{CorrelationID: 1}, - Response: &protocol.Response{CorrelationID: 1, Body: &protocol.CreateTopicsResponse{ + header: &protocol.RequestHeader{CorrelationID: 1}, + res: &protocol.Response{CorrelationID: 1, Body: &protocol.CreateTopicsResponse{ TopicErrorCodes: []*protocol.TopicErrorCode{{Topic: "the-topic", ErrorCode: protocol.ErrNone.Code()}}, }}, }, { - Header: &protocol.RequestHeader{CorrelationID: 2}, - Response: &protocol.Response{CorrelationID: 2, Body: &protocol.DeleteTopicsResponse{ + header: &protocol.RequestHeader{CorrelationID: 2}, + res: &protocol.Response{CorrelationID: 2, Body: &protocol.DeleteTopicsResponse{ TopicErrorCodes: []*protocol.TopicErrorCode{{Topic: "the-topic", ErrorCode: protocol.ErrNone.Code()}}, }}}}, }, @@ -136,39 +136,39 @@ func TestBroker_Run(t *testing.T) { responseCh: make(chan *Context, 2), requests: []*Context{ { - Header: &protocol.RequestHeader{CorrelationID: 1}, - Request: &protocol.CreateTopicRequests{Requests: []*protocol.CreateTopicRequest{{ + header: &protocol.RequestHeader{CorrelationID: 1}, + req: &protocol.CreateTopicRequests{Requests: []*protocol.CreateTopicRequest{{ Topic: "the-topic", NumPartitions: 1, ReplicationFactor: 1, }}}, }, { - Header: &protocol.RequestHeader{CorrelationID: 2}, - Request: &protocol.ProduceRequest{TopicData: []*protocol.TopicData{{ + header: &protocol.RequestHeader{CorrelationID: 2}, + req: &protocol.ProduceRequest{TopicData: []*protocol.TopicData{{ Topic: "the-topic", Data: []*protocol.Data{{ RecordSet: mustEncode(&protocol.MessageSet{Offset: 0, Messages: []*protocol.Message{{Value: []byte("The message.")}}})}}}}}, }, { - Header: &protocol.RequestHeader{CorrelationID: 3}, - Request: &protocol.OffsetsRequest{ReplicaID: 0, Topics: []*protocol.OffsetsTopic{{Topic: "the-topic", Partitions: []*protocol.OffsetsPartition{{Partition: 0, Timestamp: -1}}}}}, + header: &protocol.RequestHeader{CorrelationID: 3}, + req: &protocol.OffsetsRequest{ReplicaID: 0, Topics: []*protocol.OffsetsTopic{{Topic: "the-topic", Partitions: []*protocol.OffsetsPartition{{Partition: 0, Timestamp: -1}}}}}, }, { - Header: &protocol.RequestHeader{CorrelationID: 4}, - Request: &protocol.OffsetsRequest{ReplicaID: 0, Topics: []*protocol.OffsetsTopic{{Topic: "the-topic", Partitions: []*protocol.OffsetsPartition{{Partition: 0, Timestamp: -2}}}}}, + header: &protocol.RequestHeader{CorrelationID: 4}, + req: &protocol.OffsetsRequest{ReplicaID: 0, Topics: []*protocol.OffsetsTopic{{Topic: "the-topic", Partitions: []*protocol.OffsetsPartition{{Partition: 0, Timestamp: -2}}}}}, }, }, responses: []*Context{ { - Header: &protocol.RequestHeader{CorrelationID: 1}, - Response: &protocol.Response{CorrelationID: 1, Body: &protocol.CreateTopicsResponse{ + header: &protocol.RequestHeader{CorrelationID: 1}, + res: &protocol.Response{CorrelationID: 1, Body: &protocol.CreateTopicsResponse{ TopicErrorCodes: []*protocol.TopicErrorCode{{Topic: "the-topic", ErrorCode: protocol.ErrNone.Code()}}, }}, }, { - Header: &protocol.RequestHeader{CorrelationID: 2}, - Response: &protocol.Response{CorrelationID: 2, Body: &protocol.ProduceResponses{ + header: &protocol.RequestHeader{CorrelationID: 2}, + res: &protocol.Response{CorrelationID: 2, Body: &protocol.ProduceResponses{ Responses: []*protocol.ProduceResponse{{ Topic: "the-topic", PartitionResponses: []*protocol.ProducePartitionResponse{{Partition: 0, BaseOffset: 0, ErrorCode: protocol.ErrNone.Code()}}, @@ -176,8 +176,8 @@ func TestBroker_Run(t *testing.T) { }}, }, { - Header: &protocol.RequestHeader{CorrelationID: 3}, - Response: &protocol.Response{CorrelationID: 3, Body: &protocol.OffsetsResponse{ + header: &protocol.RequestHeader{CorrelationID: 3}, + res: &protocol.Response{CorrelationID: 3, Body: &protocol.OffsetsResponse{ Responses: []*protocol.OffsetResponse{{ Topic: "the-topic", PartitionResponses: []*protocol.PartitionResponse{{Partition: 0, Offsets: []int64{1}, ErrorCode: protocol.ErrNone.Code()}}, @@ -185,8 +185,8 @@ func TestBroker_Run(t *testing.T) { }}, }, { - Header: &protocol.RequestHeader{CorrelationID: 4}, - Response: &protocol.Response{CorrelationID: 4, Body: &protocol.OffsetsResponse{ + header: &protocol.RequestHeader{CorrelationID: 4}, + res: &protocol.Response{CorrelationID: 4, Body: &protocol.OffsetsResponse{ Responses: []*protocol.OffsetResponse{{ Topic: "the-topic", PartitionResponses: []*protocol.PartitionResponse{{Partition: 0, Offsets: []int64{0}, ErrorCode: protocol.ErrNone.Code()}}, @@ -196,7 +196,7 @@ func TestBroker_Run(t *testing.T) { }, }, handle: func(t *testing.T, _ *Broker, ctx *Context) { - switch res := ctx.Response.(*protocol.Response).Body.(type) { + switch res := ctx.res.(*protocol.Response).Body.(type) { // handle timestamp explicitly since we don't know what // it'll be set to case *protocol.ProduceResponses: @@ -211,35 +211,35 @@ func TestBroker_Run(t *testing.T) { responseCh: make(chan *Context, 2), requests: []*Context{ { - Header: &protocol.RequestHeader{CorrelationID: 1}, - Request: &protocol.CreateTopicRequests{Requests: []*protocol.CreateTopicRequest{{ + header: &protocol.RequestHeader{CorrelationID: 1}, + req: &protocol.CreateTopicRequests{Requests: []*protocol.CreateTopicRequest{{ Topic: "the-topic", NumPartitions: 1, ReplicationFactor: 1, }}}, }, { - Header: &protocol.RequestHeader{CorrelationID: 2}, - Request: &protocol.ProduceRequest{TopicData: []*protocol.TopicData{{ + header: &protocol.RequestHeader{CorrelationID: 2}, + req: &protocol.ProduceRequest{TopicData: []*protocol.TopicData{{ Topic: "the-topic", Data: []*protocol.Data{{ RecordSet: mustEncode(&protocol.MessageSet{Offset: 0, Messages: []*protocol.Message{{Value: []byte("The message.")}}})}}}}}, }, { - Header: &protocol.RequestHeader{CorrelationID: 3}, - Request: &protocol.FetchRequest{ReplicaID: 1, MinBytes: 5, Topics: []*protocol.FetchTopic{{Topic: "the-topic", Partitions: []*protocol.FetchPartition{{Partition: 0, FetchOffset: 0, MaxBytes: 100}}}}}, + header: &protocol.RequestHeader{CorrelationID: 3}, + req: &protocol.FetchRequest{ReplicaID: 1, MinBytes: 5, Topics: []*protocol.FetchTopic{{Topic: "the-topic", Partitions: []*protocol.FetchPartition{{Partition: 0, FetchOffset: 0, MaxBytes: 100}}}}}, }, }, responses: []*Context{ { - Header: &protocol.RequestHeader{CorrelationID: 1}, - Response: &protocol.Response{CorrelationID: 1, Body: &protocol.CreateTopicsResponse{ + header: &protocol.RequestHeader{CorrelationID: 1}, + res: &protocol.Response{CorrelationID: 1, Body: &protocol.CreateTopicsResponse{ TopicErrorCodes: []*protocol.TopicErrorCode{{Topic: "the-topic", ErrorCode: protocol.ErrNone.Code()}}, }}, }, { - Header: &protocol.RequestHeader{CorrelationID: 2}, - Response: &protocol.Response{CorrelationID: 2, Body: &protocol.ProduceResponses{ + header: &protocol.RequestHeader{CorrelationID: 2}, + res: &protocol.Response{CorrelationID: 2, Body: &protocol.ProduceResponses{ Responses: []*protocol.ProduceResponse{ { Topic: "the-topic", @@ -249,8 +249,8 @@ func TestBroker_Run(t *testing.T) { }}, }, { - Header: &protocol.RequestHeader{CorrelationID: 3}, - Response: &protocol.Response{CorrelationID: 3, Body: &protocol.FetchResponses{ + header: &protocol.RequestHeader{CorrelationID: 3}, + res: &protocol.Response{CorrelationID: 3, Body: &protocol.FetchResponses{ Responses: []*protocol.FetchResponse{{ Topic: "the-topic", PartitionResponses: []*protocol.FetchPartitionResponse{{ @@ -265,7 +265,7 @@ func TestBroker_Run(t *testing.T) { }, }, handle: func(t *testing.T, _ *Broker, ctx *Context) { - switch res := ctx.Response.(*protocol.Response).Body.(type) { + switch res := ctx.res.(*protocol.Response).Body.(type) { // handle timestamp explicitly since we don't know what // it'll be set to case *protocol.ProduceResponses: @@ -280,35 +280,35 @@ func TestBroker_Run(t *testing.T) { responseCh: make(chan *Context, 2), requests: []*Context{ { - Header: &protocol.RequestHeader{CorrelationID: 1}, - Request: &protocol.CreateTopicRequests{Requests: []*protocol.CreateTopicRequest{{ + header: &protocol.RequestHeader{CorrelationID: 1}, + req: &protocol.CreateTopicRequests{Requests: []*protocol.CreateTopicRequest{{ Topic: "the-topic", NumPartitions: 1, ReplicationFactor: 1, }}}, }, { - Header: &protocol.RequestHeader{CorrelationID: 2}, - Request: &protocol.ProduceRequest{TopicData: []*protocol.TopicData{{ + header: &protocol.RequestHeader{CorrelationID: 2}, + req: &protocol.ProduceRequest{TopicData: []*protocol.TopicData{{ Topic: "the-topic", Data: []*protocol.Data{{ RecordSet: mustEncode(&protocol.MessageSet{Offset: 0, Messages: []*protocol.Message{{Value: []byte("The message.")}}})}}}}}, }, { - Header: &protocol.RequestHeader{CorrelationID: 3}, - Request: &protocol.MetadataRequest{Topics: []string{"the-topic", "unknown-topic"}}, + header: &protocol.RequestHeader{CorrelationID: 3}, + req: &protocol.MetadataRequest{Topics: []string{"the-topic", "unknown-topic"}}, }, }, responses: []*Context{ { - Header: &protocol.RequestHeader{CorrelationID: 1}, - Response: &protocol.Response{CorrelationID: 1, Body: &protocol.CreateTopicsResponse{ + header: &protocol.RequestHeader{CorrelationID: 1}, + res: &protocol.Response{CorrelationID: 1, Body: &protocol.CreateTopicsResponse{ TopicErrorCodes: []*protocol.TopicErrorCode{{Topic: "the-topic", ErrorCode: protocol.ErrNone.Code()}}, }}, }, { - Header: &protocol.RequestHeader{CorrelationID: 2}, - Response: &protocol.Response{CorrelationID: 2, Body: &protocol.ProduceResponses{ + header: &protocol.RequestHeader{CorrelationID: 2}, + res: &protocol.Response{CorrelationID: 2, Body: &protocol.ProduceResponses{ Responses: []*protocol.ProduceResponse{ { Topic: "the-topic", @@ -318,8 +318,8 @@ func TestBroker_Run(t *testing.T) { }}, }, { - Header: &protocol.RequestHeader{CorrelationID: 3}, - Response: &protocol.Response{CorrelationID: 3, Body: &protocol.MetadataResponse{ + header: &protocol.RequestHeader{CorrelationID: 3}, + res: &protocol.Response{CorrelationID: 3, Body: &protocol.MetadataResponse{ Brokers: []*protocol.Broker{{NodeID: 1, Host: "localhost", Port: 9092}}, TopicMetadata: []*protocol.TopicMetadata{ {Topic: "the-topic", TopicErrorCode: protocol.ErrNone.Code(), PartitionMetadata: []*protocol.PartitionMetadata{{PartitionErrorCode: protocol.ErrNone.Code(), ParititionID: 0, Leader: 1, Replicas: []int32{1}, ISR: []int32{1}}}}, @@ -330,7 +330,7 @@ func TestBroker_Run(t *testing.T) { }, }, handle: func(t *testing.T, _ *Broker, ctx *Context) { - switch res := ctx.Response.(*protocol.Response).Body.(type) { + switch res := ctx.res.(*protocol.Response).Body.(type) { // handle timestamp explicitly since we don't know what // it'll be set to case *protocol.ProduceResponses: @@ -344,15 +344,15 @@ func TestBroker_Run(t *testing.T) { requestCh: make(chan *Context, 2), responseCh: make(chan *Context, 2), requests: []*Context{{ - Header: &protocol.RequestHeader{CorrelationID: 2}, - Request: &protocol.ProduceRequest{TopicData: []*protocol.TopicData{{ + header: &protocol.RequestHeader{CorrelationID: 2}, + req: &protocol.ProduceRequest{TopicData: []*protocol.TopicData{{ Topic: "another-topic", Data: []*protocol.Data{{ RecordSet: mustEncode(&protocol.MessageSet{Offset: 1, Messages: []*protocol.Message{{Value: []byte("The message.")}}})}}}}}}, }, responses: []*Context{{ - Header: &protocol.RequestHeader{CorrelationID: 2}, - Response: &protocol.Response{CorrelationID: 2, Body: &protocol.ProduceResponses{ + header: &protocol.RequestHeader{CorrelationID: 2}, + res: &protocol.Response{CorrelationID: 2, Body: &protocol.ProduceResponses{ Responses: []*protocol.ProduceResponse{{ Topic: "another-topic", PartitionResponses: []*protocol.ProducePartitionResponse{{Partition: 0, ErrorCode: protocol.ErrUnknownTopicOrPartition.Code()}}, @@ -360,7 +360,7 @@ func TestBroker_Run(t *testing.T) { }}}}, }, handle: func(t *testing.T, _ *Broker, ctx *Context) { - switch res := ctx.Response.(*protocol.Response).Body.(type) { + switch res := ctx.res.(*protocol.Response).Body.(type) { // handle timestamp explicitly since we don't know what // it'll be set to case *protocol.ProduceResponses: @@ -374,14 +374,14 @@ func TestBroker_Run(t *testing.T) { // requestCh: make(chan *Context, 2), // responseCh: make(chan *Context, 2), // requests: []*Context{{ - // Header: &protocol.RequestHeader{CorrelationID: 3}, - // Request: &protocol.FindCoordinatorRequest{ + // header: &protocol.RequestHeader{CorrelationID: 3}, + // req: &protocol.FindCoordinatorRequest{ // CoordinatorKey: "test-group", // }, // }}, // responses: []*Context{{ - // Header: &protocol.RequestHeader{CorrelationID: 3}, - // Response: &protocol.Response{CorrelationID: 3, Body: &protocol.FindCoordinatorResponse{ + // header: &protocol.RequestHeader{CorrelationID: 3}, + // res: &protocol.res{CorrelationID: 3, Body: &protocol.FindCoordinatorResponse{ // Coordinator: protocol.Coordinator{ // NodeID: 1, // Host: "localhost", @@ -446,7 +446,7 @@ func TestBroker_Run(t *testing.T) { request := tt.args.requests[i] reqSpan := b.tracer.StartSpan("request", opentracing.ChildOf(span.Context())) - ctx := &Context{Header: request.Header, Request: request.Request, Parent: opentracing.ContextWithSpan(runCtx, reqSpan)} + ctx := &Context{header: request.header, req: request.req, parent: opentracing.ContextWithSpan(runCtx, reqSpan)} tt.args.requestCh <- ctx @@ -456,8 +456,8 @@ func TestBroker_Run(t *testing.T) { tt.handle(t, b, ctx) } - if !reflect.DeepEqual(ctx.Response, tt.args.responses[i].Response) { - t.Errorf("got %s, want: %s", spewstr(ctx.Response), spewstr(tt.args.responses[i].Response)) + if !reflect.DeepEqual(ctx.res, tt.args.responses[i].res) { + t.Errorf("got %s, want: %s", spewstr(ctx.res), spewstr(tt.args.responses[i].res)) } } diff --git a/jocko/context.go b/jocko/context.go index 50f8ffad..12e35d1e 100644 --- a/jocko/context.go +++ b/jocko/context.go @@ -11,13 +11,25 @@ import ( type Context struct { sync.Mutex - Conn io.ReadWriter - Error error - Header *protocol.RequestHeader - Parent context.Context - Request interface{} - Response interface{} - values map[interface{}]interface{} + conn io.ReadWriter + err error + header *protocol.RequestHeader + parent context.Context + req interface{} + res interface{} + vals map[interface{}]interface{} +} + +func (ctx *Context) Request() interface{} { + return ctx.req +} + +func (ctx *Context) Response() interface{} { + return ctx.res +} + +func (c *Context) Header() *protocol.RequestHeader { + return c.header } func (ctx *Context) Deadline() (deadline time.Time, ok bool) { @@ -29,17 +41,17 @@ func (ctx *Context) Done() <-chan struct{} { } func (ctx *Context) Err() error { - return ctx.Error + return ctx.err } func (ctx *Context) Value(key interface{}) interface{} { ctx.Lock() - if ctx.values == nil { - ctx.values = make(map[interface{}]interface{}) + if ctx.vals == nil { + ctx.vals = make(map[interface{}]interface{}) } - val := ctx.values[key] + val := ctx.vals[key] if val == nil { - val = ctx.Parent.Value(key) + val = ctx.parent.Value(key) } ctx.Unlock() return val diff --git a/jocko/server.go b/jocko/server.go index cd290b61..e06acf71 100644 --- a/jocko/server.go +++ b/jocko/server.go @@ -254,10 +254,10 @@ func (s *Server) handleRequest(conn net.Conn) { ctx = context.WithValue(ctx, requestQueueSpanKey, queueSpan) s.requestCh <- &Context{ - Parent: ctx, - Header: header, - Request: req, - Conn: conn, + parent: ctx, + header: header, + req: req, + conn: conn, } } } @@ -268,11 +268,11 @@ func (s *Server) handleResponse(respCtx *Context) error { s.vlog(sp, "response", respCtx) defer psp.Finish() defer sp.Finish() - b, err := protocol.Encode(respCtx.Response.(protocol.Encoder)) + b, err := protocol.Encode(respCtx.res.(protocol.Encoder)) if err != nil { return err } - _, err = respCtx.Conn.Write(b) + _, err = respCtx.conn.Write(b) return err }