Skip to content

Commit

Permalink
Merge pull request travisjeffery#77 from travisjeffery/tests
Browse files Browse the repository at this point in the history
more broker tests
  • Loading branch information
Travis Jeffery authored Nov 6, 2017
2 parents be0ab37 + 37c8608 commit a4a403e
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 26 deletions.
4 changes: 2 additions & 2 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,7 @@ func (b *Broker) handleLeaderAndISR(header *protocol.RequestHeader, req *protoco
}
for i, p := range req.PartitionStates {
partition, err := b.partition(p.Topic, p.Partition)
// TODO: seems ok to have protocol.ErrUnknownTopicOrPartition here?
if err != protocol.ErrNone {
if err != protocol.ErrUnknownTopicOrPartition && err != protocol.ErrNone {
setErr(i, p, err)
continue
}
Expand Down Expand Up @@ -289,6 +288,7 @@ func (b *Broker) handleLeaderAndISR(header *protocol.RequestHeader, req *protoco
continue
}
}
resp.Partitions[i] = &protocol.LeaderAndISRPartition{Partition: p.Partition, Topic: p.Topic, ErrorCode: protocol.ErrNone.Code()}
}
return resp
}
Expand Down
78 changes: 54 additions & 24 deletions broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,40 @@ func TestBroker_Run(t *testing.T) {
}}}},
},
},
{
name: "leader and isr leader new partition",
fields: newFields(),
args: args{
requestCh: make(chan jocko.Request, 2),
responseCh: make(chan jocko.Response, 2),
requests: []jocko.Request{{
Header: &protocol.RequestHeader{CorrelationID: 2},
Request: &protocol.LeaderAndISRRequest{
PartitionStates: []*protocol.PartitionState{
{
Topic: "the-topic",
Partition: 1,
ISR: []int32{1},
Replicas: []int32{1},
Leader: 1,
ZKVersion: 1,
},
},
}},
},
responses: []jocko.Response{{
Header: &protocol.RequestHeader{CorrelationID: 2},
Response: &protocol.Response{CorrelationID: 2, Body: &protocol.LeaderAndISRResponse{
Partitions: []*protocol.LeaderAndISRPartition{
{
ErrorCode: protocol.ErrNone.Code(),
Partition: 1,
Topic: "the-topic",
},
},
}}}},
},
},
}
for _, tt := range tests {
os.RemoveAll("/tmp/jocko")
Expand Down Expand Up @@ -1225,42 +1259,32 @@ func TestBroker_deletePartitions(t *testing.T) {
}

func TestBroker_Shutdown(t *testing.T) {
type fields struct {
logger *simplelog.Logger
id int32
topicMap map[string][]*jocko.Partition
replicators map[*jocko.Partition]*Replicator
brokerAddr string
logDir string
raft jocko.Raft
serf jocko.Serf
shutdownCh chan struct{}
shutdown bool
}
tests := []struct {
name string
fields fields
wantErr bool
}{
// TODO: Add test cases.
{
name: "shutdown ok",
fields: newFields(),
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := &Broker{
logger: tt.fields.logger,
id: tt.fields.id,
topicMap: tt.fields.topicMap,
replicators: tt.fields.replicators,
brokerAddr: tt.fields.brokerAddr,
logDir: tt.fields.logDir,
raft: tt.fields.raft,
serf: tt.fields.serf,
shutdownCh: tt.fields.shutdownCh,
shutdown: tt.fields.shutdown,
b, err := New(tt.fields.id, Addr(tt.fields.brokerAddr), Serf(tt.fields.serf), Raft(tt.fields.raft), Logger(tt.fields.logger), RaftCommands(tt.fields.raftCommands), LogDir(tt.fields.logDir))
if err != nil {
t.Errorf("Broker.New() error = %v, wanted nil", err)
}
if err := b.Shutdown(); (err != nil) != tt.wantErr {
t.Errorf("Broker.Shutdown() error = %v, wantErr %v", err, tt.wantErr)
}
if tt.fields.raft.ShutdownInvoked != true {
t.Errorf("did not shutdown raft")
}
if tt.fields.serf.ShutdownInvoked != true {
t.Errorf("did not shutdown raft")
}
})
}
}
Expand Down Expand Up @@ -1415,6 +1439,9 @@ func newFields() fields {
MemberFn: func(memberID int32) *jocko.ClusterMember {
return &jocko.ClusterMember{ID: 1}
},
ShutdownFn: func() error {
return nil
},
}
raft := &mock.Raft{
AddrFn: func() string {
Expand All @@ -1438,6 +1465,9 @@ func newFields() fields {
ApplyFn: func(jocko.RaftCommand) error {
return nil
},
ShutdownFn: func() error {
return nil
},
}
return fields{
topicMap: make(map[string][]*jocko.Partition),
Expand Down

0 comments on commit a4a403e

Please sign in to comment.