Skip to content

Commit

Permalink
protocol: implement marshal log object for request/responses
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Jun 11, 2018
1 parent a840da5 commit e4e0134
Show file tree
Hide file tree
Showing 58 changed files with 770 additions and 92 deletions.
53 changes: 32 additions & 21 deletions jocko/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,6 @@ func (b *Broker) handleLeaderAndISR(ctx *Context, req *protocol.LeaderAndISRRequ
}
}
for i, p := range req.PartitionStates {
_, err := b.replicaLookup.Replica(p.Topic, p.Partition)
isNew := err != nil

// TODO: need to replace the replica regardless
replica := &Replica{
BrokerID: b.config.ID,
Expand All @@ -341,18 +338,25 @@ func (b *Broker) handleLeaderAndISR(ctx *Context, req *protocol.LeaderAndISRRequ
}
b.replicaLookup.AddReplica(replica)

if err := b.startReplica(replica); err != protocol.ErrNone {
setErr(i, p, err)
continue
}
if p.Leader == b.config.ID && (replica.Partition.Leader != b.config.ID || isNew) {
if p.Leader == b.config.ID && (replica.Partition.Leader == b.config.ID) {
// is command asking this broker to be the new leader for p and this broker is not already the leader for

if err := b.startReplica(replica); err != protocol.ErrNone {
setErr(i, p, err)
continue
}

if err := b.becomeLeader(replica, p); err != protocol.ErrNone {
setErr(i, p, err)
continue
}
} else if contains(p.Replicas, b.config.ID) && (!contains(replica.Partition.AR, p.Leader) || isNew) {
} else if contains(p.Replicas, b.config.ID) && (p.Leader != b.config.ID) {
// is command asking this broker to follow leader who it isn't a leader of already
if err := b.startReplica(replica); err != protocol.ErrNone {
setErr(i, p, err)
continue
}

if err := b.becomeFollower(replica, p); err != protocol.ErrNone {
setErr(i, p, err)
continue
Expand Down Expand Up @@ -396,12 +400,12 @@ func (b *Broker) handleOffsets(ctx *Context, req *protocol.OffsetsRequest) *prot
return oResp
}

func (b *Broker) handleProduce(ctx *Context, req *protocol.ProduceRequest) *protocol.ProduceResponses {
func (b *Broker) handleProduce(ctx *Context, req *protocol.ProduceRequest) *protocol.ProduceResponse {
sp := span(ctx, b.tracer, "produce")
defer sp.Finish()
resp := new(protocol.ProduceResponses)
resp := new(protocol.ProduceResponse)
resp.APIVersion = req.Version()
resp.Responses = make([]*protocol.ProduceResponse, len(req.TopicData))
resp.Responses = make([]*protocol.ProduceTopicResponse, len(req.TopicData))
for i, td := range req.TopicData {
presps := make([]*protocol.ProducePartitionResponse, len(td.Data))
for j, p := range td.Data {
Expand Down Expand Up @@ -438,7 +442,7 @@ func (b *Broker) handleProduce(ctx *Context, req *protocol.ProduceRequest) *prot
presp.LogAppendTime = time.Now()
presps[j] = presp
}
resp.Responses[i] = &protocol.ProduceResponse{
resp.Responses[i] = &protocol.ProduceTopicResponse{
Topic: td.Topic,
PartitionResponses: presps,
}
Expand Down Expand Up @@ -494,20 +498,20 @@ func (b *Broker) handleMetadata(ctx *Context, req *protocol.MetadataRequest) *pr
_, p, err := state.GetPartition(topic.Topic, id)
if err != nil {
partitionMetadata = append(partitionMetadata, &protocol.PartitionMetadata{
ParititionID: id,
PartitionID: id,
PartitionErrorCode: protocol.ErrUnknown.Code(),
})
continue
}
if p == nil {
partitionMetadata = append(partitionMetadata, &protocol.PartitionMetadata{
ParititionID: id,
PartitionID: id,
PartitionErrorCode: protocol.ErrUnknownTopicOrPartition.Code(),
})
continue
}
partitionMetadata = append(partitionMetadata, &protocol.PartitionMetadata{
ParititionID: p.ID,
PartitionID: p.ID,
PartitionErrorCode: protocol.ErrNone.Code(),
Leader: p.Leader,
Replicas: p.AR,
Expand Down Expand Up @@ -743,16 +747,16 @@ func (b *Broker) handleHeartbeat(ctx *Context, r *protocol.HeartbeatRequest) *pr
return resp
}

func (b *Broker) handleFetch(ctx *Context, r *protocol.FetchRequest) *protocol.FetchResponses {
func (b *Broker) handleFetch(ctx *Context, r *protocol.FetchRequest) *protocol.FetchResponse {
sp := span(ctx, b.tracer, "fetch")
defer sp.Finish()
fresp := &protocol.FetchResponses{
Responses: make([]*protocol.FetchResponse, len(r.Topics)),
fresp := &protocol.FetchResponse{
Responses: make(protocol.FetchTopicResponses, len(r.Topics)),
}
fresp.APIVersion = r.Version()
received := time.Now()
for i, topic := range r.Topics {
fr := &protocol.FetchResponse{
fr := &protocol.FetchTopicResponse{
Topic: topic.Topic,
PartitionResponses: make([]*protocol.FetchPartitionResponse, len(topic.Partitions)),
}
Expand Down Expand Up @@ -957,6 +961,8 @@ func (b *Broker) startReplica(replica *Replica) protocol.Error {
state := b.fsm.State()
_, topic, _ := state.GetTopic(replica.Partition.Topic)

// TODO: think i need to just ensure/add the topic if it's not here yet

if topic == nil {
return protocol.ErrUnknownTopicOrPartition
}
Expand Down Expand Up @@ -1183,7 +1189,8 @@ func (b *Broker) becomeFollower(replica *Replica, cmd *protocol.PartitionState)
if err != nil {
return protocol.ErrUnknown.WithErr(err)
}
r := NewReplicator(ReplicatorConfig{}, replica, conn, b.logger)
logger := b.logger.With(log.Int32("leader", replica.Partition.Leader))
r := NewReplicator(ReplicatorConfig{}, replica, conn, logger)
replica.Replicator = r
if !b.config.DevMode {
r.Replicate()
Expand Down Expand Up @@ -1270,6 +1277,10 @@ type Replica struct {
sync.Mutex
}

func (r Replica) String() string {
return fmt.Sprintf("replica: %d {broker: %d, leader: %d, hw: %d, leo: %d}", r.Partition.ID, r.BrokerID, r.Partition.Leader, r.Hw, r.Leo)
}

func (b *Broker) offsetsTopic(ctx *Context) (topic *structs.Topic, err error) {
state := b.fsm.State()
name := "__consumer_offsets"
Expand Down
60 changes: 30 additions & 30 deletions jocko/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ func TestBroker_Run(t *testing.T) {
requestCh: make(chan *Context, 2),
responseCh: make(chan *Context, 2),
requests: []*Context{{
header: &protocol.RequestHeader{CorrelationID: 1},
req: &protocol.APIVersionsRequest{},
header: &protocol.RequestHeader{CorrelationID: 1},
req: &protocol.APIVersionsRequest{},
}},
responses: []*Context{{
header: &protocol.RequestHeader{CorrelationID: 1},
res: &protocol.Response{CorrelationID: 1, Body: apiVersions},
header: &protocol.RequestHeader{CorrelationID: 1},
res: &protocol.Response{CorrelationID: 1, Body: apiVersions},
}},
},
},
Expand Down Expand Up @@ -114,8 +114,8 @@ func TestBroker_Run(t *testing.T) {
NumPartitions: 1,
ReplicationFactor: 1,
}}}}, {
header: &protocol.RequestHeader{CorrelationID: 2},
req: &protocol.DeleteTopicsRequest{Topics: []string{"the-topic"}}},
header: &protocol.RequestHeader{CorrelationID: 2},
req: &protocol.DeleteTopicsRequest{Topics: []string{"the-topic"}}},
},
responses: []*Context{{
header: &protocol.RequestHeader{CorrelationID: 1},
Expand Down Expand Up @@ -151,12 +151,12 @@ func TestBroker_Run(t *testing.T) {
RecordSet: mustEncode(&protocol.MessageSet{Offset: 0, Messages: []*protocol.Message{{Value: []byte("The message.")}}})}}}}},
},
{
header: &protocol.RequestHeader{CorrelationID: 3},
req: &protocol.OffsetsRequest{ReplicaID: 0, Topics: []*protocol.OffsetsTopic{{Topic: "the-topic", Partitions: []*protocol.OffsetsPartition{{Partition: 0, Timestamp: -1}}}}},
header: &protocol.RequestHeader{CorrelationID: 3},
req: &protocol.OffsetsRequest{ReplicaID: 0, Topics: []*protocol.OffsetsTopic{{Topic: "the-topic", Partitions: []*protocol.OffsetsPartition{{Partition: 0, Timestamp: -1}}}}},
},
{
header: &protocol.RequestHeader{CorrelationID: 4},
req: &protocol.OffsetsRequest{ReplicaID: 0, Topics: []*protocol.OffsetsTopic{{Topic: "the-topic", Partitions: []*protocol.OffsetsPartition{{Partition: 0, Timestamp: -2}}}}},
header: &protocol.RequestHeader{CorrelationID: 4},
req: &protocol.OffsetsRequest{ReplicaID: 0, Topics: []*protocol.OffsetsTopic{{Topic: "the-topic", Partitions: []*protocol.OffsetsPartition{{Partition: 0, Timestamp: -2}}}}},
},
},
responses: []*Context{
Expand All @@ -168,8 +168,8 @@ func TestBroker_Run(t *testing.T) {
},
{
header: &protocol.RequestHeader{CorrelationID: 2},
res: &protocol.Response{CorrelationID: 2, Body: &protocol.ProduceResponses{
Responses: []*protocol.ProduceResponse{{
res: &protocol.Response{CorrelationID: 2, Body: &protocol.ProduceResponse{
Responses: []*protocol.ProduceTopicResponse{{
Topic: "the-topic",
PartitionResponses: []*protocol.ProducePartitionResponse{{Partition: 0, BaseOffset: 0, ErrorCode: protocol.ErrNone.Code()}},
}},
Expand Down Expand Up @@ -199,7 +199,7 @@ func TestBroker_Run(t *testing.T) {
switch res := ctx.res.(*protocol.Response).Body.(type) {
// handle timestamp explicitly since we don't know what
// it'll be set to
case *protocol.ProduceResponses:
case *protocol.ProduceResponse:
handleProduceResponse(t, res)
}
},
Expand All @@ -226,8 +226,8 @@ func TestBroker_Run(t *testing.T) {
RecordSet: mustEncode(&protocol.MessageSet{Offset: 0, Messages: []*protocol.Message{{Value: []byte("The message.")}}})}}}}},
},
{
header: &protocol.RequestHeader{CorrelationID: 3},
req: &protocol.FetchRequest{ReplicaID: 1, MinBytes: 5, Topics: []*protocol.FetchTopic{{Topic: "the-topic", Partitions: []*protocol.FetchPartition{{Partition: 0, FetchOffset: 0, MaxBytes: 100}}}}},
header: &protocol.RequestHeader{CorrelationID: 3},
req: &protocol.FetchRequest{ReplicaID: 1, MinBytes: 5, Topics: []*protocol.FetchTopic{{Topic: "the-topic", Partitions: []*protocol.FetchPartition{{Partition: 0, FetchOffset: 0, MaxBytes: 100}}}}},
},
},
responses: []*Context{
Expand All @@ -239,8 +239,8 @@ func TestBroker_Run(t *testing.T) {
},
{
header: &protocol.RequestHeader{CorrelationID: 2},
res: &protocol.Response{CorrelationID: 2, Body: &protocol.ProduceResponses{
Responses: []*protocol.ProduceResponse{
res: &protocol.Response{CorrelationID: 2, Body: &protocol.ProduceResponse{
Responses: []*protocol.ProduceTopicResponse{
{
Topic: "the-topic",
PartitionResponses: []*protocol.ProducePartitionResponse{{Partition: 0, BaseOffset: 0, ErrorCode: protocol.ErrNone.Code()}},
Expand All @@ -250,8 +250,8 @@ func TestBroker_Run(t *testing.T) {
},
{
header: &protocol.RequestHeader{CorrelationID: 3},
res: &protocol.Response{CorrelationID: 3, Body: &protocol.FetchResponses{
Responses: []*protocol.FetchResponse{{
res: &protocol.Response{CorrelationID: 3, Body: &protocol.FetchResponse{
Responses: protocol.FetchTopicResponses{{
Topic: "the-topic",
PartitionResponses: []*protocol.FetchPartitionResponse{{
Partition: 0,
Expand All @@ -268,7 +268,7 @@ func TestBroker_Run(t *testing.T) {
switch res := ctx.res.(*protocol.Response).Body.(type) {
// handle timestamp explicitly since we don't know what
// it'll be set to
case *protocol.ProduceResponses:
case *protocol.ProduceResponse:
handleProduceResponse(t, res)
}
},
Expand All @@ -295,8 +295,8 @@ func TestBroker_Run(t *testing.T) {
RecordSet: mustEncode(&protocol.MessageSet{Offset: 0, Messages: []*protocol.Message{{Value: []byte("The message.")}}})}}}}},
},
{
header: &protocol.RequestHeader{CorrelationID: 3},
req: &protocol.MetadataRequest{Topics: []string{"the-topic", "unknown-topic"}},
header: &protocol.RequestHeader{CorrelationID: 3},
req: &protocol.MetadataRequest{Topics: []string{"the-topic", "unknown-topic"}},
},
},
responses: []*Context{
Expand All @@ -308,8 +308,8 @@ func TestBroker_Run(t *testing.T) {
},
{
header: &protocol.RequestHeader{CorrelationID: 2},
res: &protocol.Response{CorrelationID: 2, Body: &protocol.ProduceResponses{
Responses: []*protocol.ProduceResponse{
res: &protocol.Response{CorrelationID: 2, Body: &protocol.ProduceResponse{
Responses: []*protocol.ProduceTopicResponse{
{
Topic: "the-topic",
PartitionResponses: []*protocol.ProducePartitionResponse{{Partition: 0, BaseOffset: 0, ErrorCode: protocol.ErrNone.Code()}},
Expand All @@ -322,7 +322,7 @@ func TestBroker_Run(t *testing.T) {
res: &protocol.Response{CorrelationID: 3, Body: &protocol.MetadataResponse{
Brokers: []*protocol.Broker{{NodeID: 1, Host: "localhost", Port: 9092}},
TopicMetadata: []*protocol.TopicMetadata{
{Topic: "the-topic", TopicErrorCode: protocol.ErrNone.Code(), PartitionMetadata: []*protocol.PartitionMetadata{{PartitionErrorCode: protocol.ErrNone.Code(), ParititionID: 0, Leader: 1, Replicas: []int32{1}, ISR: []int32{1}}}},
{Topic: "the-topic", TopicErrorCode: protocol.ErrNone.Code(), PartitionMetadata: []*protocol.PartitionMetadata{{PartitionErrorCode: protocol.ErrNone.Code(), PartitionID: 0, Leader: 1, Replicas: []int32{1}, ISR: []int32{1}}}},
{Topic: "unknown-topic", TopicErrorCode: protocol.ErrUnknownTopicOrPartition.Code()},
},
}},
Expand All @@ -333,7 +333,7 @@ func TestBroker_Run(t *testing.T) {
switch res := ctx.res.(*protocol.Response).Body.(type) {
// handle timestamp explicitly since we don't know what
// it'll be set to
case *protocol.ProduceResponses:
case *protocol.ProduceResponse:
handleProduceResponse(t, res)
}
},
Expand All @@ -352,8 +352,8 @@ func TestBroker_Run(t *testing.T) {
},
responses: []*Context{{
header: &protocol.RequestHeader{CorrelationID: 2},
res: &protocol.Response{CorrelationID: 2, Body: &protocol.ProduceResponses{
Responses: []*protocol.ProduceResponse{{
res: &protocol.Response{CorrelationID: 2, Body: &protocol.ProduceResponse{
Responses: []*protocol.ProduceTopicResponse{{
Topic: "another-topic",
PartitionResponses: []*protocol.ProducePartitionResponse{{Partition: 0, ErrorCode: protocol.ErrUnknownTopicOrPartition.Code()}},
}},
Expand All @@ -363,7 +363,7 @@ func TestBroker_Run(t *testing.T) {
switch res := ctx.res.(*protocol.Response).Body.(type) {
// handle timestamp explicitly since we don't know what
// it'll be set to
case *protocol.ProduceResponses:
case *protocol.ProduceResponse:
handleProduceResponse(t, res)
}
},
Expand Down Expand Up @@ -777,7 +777,7 @@ func wantPeers(s *Broker, peers int) error {
return nil
}

func handleProduceResponse(t *testing.T, res *protocol.ProduceResponses) {
func handleProduceResponse(t *testing.T, res *protocol.ProduceResponse) {
for _, response := range res.Responses {
for _, pr := range response.PartitionResponses {
if pr.ErrorCode != protocol.ErrNone.Code() {
Expand Down
4 changes: 2 additions & 2 deletions jocko/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ func (c *Conn) CreateTopics(req *protocol.CreateTopicRequests) (*protocol.Create
}

// Fetch sends a fetch request and returns the response.
func (c *Conn) Fetch(req *protocol.FetchRequest) (*protocol.FetchResponses, error) {
var resp protocol.FetchResponses
func (c *Conn) Fetch(req *protocol.FetchRequest) (*protocol.FetchResponse, error) {
var resp protocol.FetchResponse
err := c.readOperation(func(deadline time.Time, id int32) error {
return c.writeRequest(req)
}, func(deadline time.Time, size int) error {
Expand Down
20 changes: 17 additions & 3 deletions jocko/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (
"time"

"github.com/travisjeffery/jocko/protocol"
"go.uber.org/zap/zapcore"
)

type Context struct {
sync.Mutex
mu sync.Mutex
conn io.ReadWriter
err error
header *protocol.RequestHeader
Expand Down Expand Up @@ -45,14 +46,27 @@ func (ctx *Context) Err() error {
}

func (ctx *Context) Value(key interface{}) interface{} {
ctx.Lock()
ctx.mu.Lock()
if ctx.vals == nil {
ctx.vals = make(map[interface{}]interface{})
}
val := ctx.vals[key]
if val == nil {
val = ctx.parent.Value(key)
}
ctx.Unlock()
ctx.mu.Unlock()
return val
}

func (ctx *Context) MarshalLogObject(e zapcore.ObjectEncoder) error {
if ctx.header != nil {
e.AddObject("header", ctx.header)
}
if ctx.req != nil {
e.AddObject("request", ctx.req.(zapcore.ObjectMarshaler))
}
if ctx.res != nil {
e.AddObject("response", ctx.res.(zapcore.ObjectMarshaler))
}
return nil
}
2 changes: 1 addition & 1 deletion jocko/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

// Client is used to request other brokers.
type client interface {
Fetch(fetchRequest *protocol.FetchRequest) (*protocol.FetchResponses, error)
Fetch(fetchRequest *protocol.FetchRequest) (*protocol.FetchResponse, error)
CreateTopics(createRequest *protocol.CreateTopicRequests) (*protocol.CreateTopicsResponse, error)
LeaderAndISR(request *protocol.LeaderAndISRRequest) (*protocol.LeaderAndISRResponse, error)
// others
Expand Down
Loading

0 comments on commit e4e0134

Please sign in to comment.