From 61bbdbf8a473ab1951897f08fc6ec57313d4e54c Mon Sep 17 00:00:00 2001 From: Travis Jeffery Date: Mon, 6 Nov 2017 02:29:21 -0500 Subject: [PATCH 1/3] broker: test shutdown --- broker/broker_test.go | 44 ++++++++++++++++++++----------------------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/broker/broker_test.go b/broker/broker_test.go index 592615e3..c76140e5 100644 --- a/broker/broker_test.go +++ b/broker/broker_test.go @@ -1225,42 +1225,32 @@ func TestBroker_deletePartitions(t *testing.T) { } func TestBroker_Shutdown(t *testing.T) { - type fields struct { - logger *simplelog.Logger - id int32 - topicMap map[string][]*jocko.Partition - replicators map[*jocko.Partition]*Replicator - brokerAddr string - logDir string - raft jocko.Raft - serf jocko.Serf - shutdownCh chan struct{} - shutdown bool - } tests := []struct { name string fields fields wantErr bool }{ - // TODO: Add test cases. + { + name: "shutdown ok", + fields: newFields(), + wantErr: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - b := &Broker{ - logger: tt.fields.logger, - id: tt.fields.id, - topicMap: tt.fields.topicMap, - replicators: tt.fields.replicators, - brokerAddr: tt.fields.brokerAddr, - logDir: tt.fields.logDir, - raft: tt.fields.raft, - serf: tt.fields.serf, - shutdownCh: tt.fields.shutdownCh, - shutdown: tt.fields.shutdown, + b, err := New(tt.fields.id, Addr(tt.fields.brokerAddr), Serf(tt.fields.serf), Raft(tt.fields.raft), Logger(tt.fields.logger), RaftCommands(tt.fields.raftCommands), LogDir(tt.fields.logDir)) + if err != nil { + t.Errorf("Broker.New() error = %v, wanted nil", err) } if err := b.Shutdown(); (err != nil) != tt.wantErr { t.Errorf("Broker.Shutdown() error = %v, wantErr %v", err, tt.wantErr) } + if tt.fields.raft.ShutdownInvoked != true { + t.Errorf("did not shutdown raft") + } + if tt.fields.serf.ShutdownInvoked != true { + t.Errorf("did not shutdown raft") + } }) } } @@ -1415,6 +1405,9 @@ func newFields() fields { MemberFn: func(memberID int32) *jocko.ClusterMember { return &jocko.ClusterMember{ID: 1} }, + ShutdownFn: func() error { + return nil + }, } raft := &mock.Raft{ AddrFn: func() string { @@ -1438,6 +1431,9 @@ func newFields() fields { ApplyFn: func(jocko.RaftCommand) error { return nil }, + ShutdownFn: func() error { + return nil + }, } return fields{ topicMap: make(map[string][]*jocko.Partition), From 89f2b7cd20dc12ea9dbd04db94d9b34dc6e6ee85 Mon Sep 17 00:00:00 2001 From: Travis Jeffery Date: Mon, 6 Nov 2017 02:45:20 -0500 Subject: [PATCH 2/3] broker: test add leader and isr new partition --- broker/broker.go | 3 ++- broker/broker_test.go | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/broker/broker.go b/broker/broker.go index 4924d129..abd7273d 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -257,7 +257,7 @@ func (b *Broker) handleLeaderAndISR(header *protocol.RequestHeader, req *protoco for i, p := range req.PartitionStates { partition, err := b.partition(p.Topic, p.Partition) // TODO: seems ok to have protocol.ErrUnknownTopicOrPartition here? - if err != protocol.ErrNone { + if err != protocol.ErrUnknownTopicOrPartition && err != protocol.ErrNone { setErr(i, p, err) continue } @@ -289,6 +289,7 @@ func (b *Broker) handleLeaderAndISR(header *protocol.RequestHeader, req *protoco continue } } + resp.Partitions[i] = &protocol.LeaderAndISRPartition{Partition: p.Partition, Topic: p.Topic, ErrorCode: protocol.ErrNone.Code()} } return resp } diff --git a/broker/broker_test.go b/broker/broker_test.go index c76140e5..ff590a00 100644 --- a/broker/broker_test.go +++ b/broker/broker_test.go @@ -444,6 +444,40 @@ func TestBroker_Run(t *testing.T) { }}}}, }, }, + { + name: "leader and isr leader new partition", + fields: newFields(), + args: args{ + requestCh: make(chan jocko.Request, 2), + responseCh: make(chan jocko.Response, 2), + requests: []jocko.Request{{ + Header: &protocol.RequestHeader{CorrelationID: 2}, + Request: &protocol.LeaderAndISRRequest{ + PartitionStates: []*protocol.PartitionState{ + { + Topic: "the-topic", + Partition: 1, + ISR: []int32{1}, + Replicas: []int32{1}, + Leader: 1, + ZKVersion: 1, + }, + }, + }}, + }, + responses: []jocko.Response{{ + Header: &protocol.RequestHeader{CorrelationID: 2}, + Response: &protocol.Response{CorrelationID: 2, Body: &protocol.LeaderAndISRResponse{ + Partitions: []*protocol.LeaderAndISRPartition{ + { + ErrorCode: protocol.ErrNone.Code(), + Partition: 1, + Topic: "the-topic", + }, + }, + }}}}, + }, + }, } for _, tt := range tests { os.RemoveAll("/tmp/jocko") From 37c8608dc701466a3c1313820b35368ceacee8b9 Mon Sep 17 00:00:00 2001 From: Travis Jeffery Date: Mon, 6 Nov 2017 02:50:38 -0500 Subject: [PATCH 3/3] rm comment --- broker/broker.go | 1 - 1 file changed, 1 deletion(-) diff --git a/broker/broker.go b/broker/broker.go index abd7273d..c6480b2d 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -256,7 +256,6 @@ func (b *Broker) handleLeaderAndISR(header *protocol.RequestHeader, req *protoco } for i, p := range req.PartitionStates { partition, err := b.partition(p.Topic, p.Partition) - // TODO: seems ok to have protocol.ErrUnknownTopicOrPartition here? if err != protocol.ErrUnknownTopicOrPartition && err != protocol.ErrNone { setErr(i, p, err) continue