Skip to content

Commit

Permalink
refactor ctx
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Jun 10, 2018
1 parent 8a6348e commit a840da5
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 100 deletions.
14 changes: 7 additions & 7 deletions jocko/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (b *Broker) Run(ctx context.Context, requests <-chan *Context, responses ch
queueSpan.Finish()
}

switch req := reqCtx.Request.(type) {
switch req := reqCtx.req.(type) {
case *protocol.ProduceRequest:
response = b.handleProduce(reqCtx, req)
case *protocol.FetchRequest:
Expand Down Expand Up @@ -195,11 +195,11 @@ func (b *Broker) Run(ctx context.Context, requests <-chan *Context, responses ch
responseCtx := context.WithValue(reqCtx, responseQueueSpanKey, queueSpan)

responses <- &Context{
Parent: responseCtx,
Conn: reqCtx.Conn,
Header: reqCtx.Header,
Response: &protocol.Response{
CorrelationID: reqCtx.Header.CorrelationID,
parent: responseCtx,
conn: reqCtx.conn,
header: reqCtx.header,
res: &protocol.Response{
CorrelationID: reqCtx.header.CorrelationID,
Body: response,
},
}
Expand All @@ -215,7 +215,7 @@ func (b *Broker) JoinLAN(addrs ...string) protocol.Error {
return protocol.ErrNone
}

// Request handling.
// req handling.

func span(ctx context.Context, tracer opentracing.Tracer, op string) opentracing.Span {
if ctx == nil {
Expand Down
Loading

0 comments on commit a840da5

Please sign in to comment.