Skip to content

Commit

Permalink
broker: test join/sync (or start anyway)
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Aug 6, 2018
1 parent a1cb84e commit 4d26afa
Show file tree
Hide file tree
Showing 9 changed files with 311 additions and 101 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ vendor/*
.idea
*.log
*.index
NOTES.org
.dir-locals.el
.#*
.DS_Store
7 changes: 5 additions & 2 deletions commitlog/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ import (
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
)

func TestIndex(t *testing.T) {
path, err := ioutil.TempDir("", "commitlog-index")
dir, err := ioutil.TempDir("", "commitlog-index")
require.NoError(t, err)
path := filepath.Join(dir, "test.index")

totalEntries := rand.Intn(10) + 10
//case for roundDown
Expand Down Expand Up @@ -73,8 +75,9 @@ func TestIndex(t *testing.T) {
}

func TestIndexScanner(t *testing.T) {
path, err := ioutil.TempDir("", "commitlog-index")
dir, err := ioutil.TempDir("", "commitlog-index")
require.NoError(t, err)
path := filepath.Join(dir, "test.index")

totalEntries := rand.Intn(10) + 10
//case for roundDown
Expand Down
50 changes: 35 additions & 15 deletions jocko/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (b *Broker) handleDeleteTopics(ctx *Context, reqs *protocol.DeleteTopicsReq
},
})
if err != nil {
protocol.ErrUnknown.WithErr(err)
return protocol.ErrUnknown.WithErr(err)
}
return protocol.ErrNone
})
Expand Down Expand Up @@ -559,13 +559,12 @@ func (b *Broker) handleFindCoordinator(ctx *Context, req *protocol.FindCoordinat
res := &protocol.FindCoordinatorResponse{}
res.APIVersion = req.Version()

// TODO: distribute this.
state := b.fsm.State()

var broker *metadata.Broker
var p *structs.Partition
var i int32

state := b.fsm.State()

topic, err := b.offsetsTopic(ctx)
if err != nil {
goto ERROR
Expand All @@ -592,7 +591,7 @@ ERROR:
if res.ErrorCode == 0 {
res.ErrorCode = protocol.ErrUnknown.Code()
}
log.Error.Printf("broker/%d: %s: coordinator error: %s", b.config.ID, broker, err)
log.Error.Printf("broker/%d: broker: %v: coordinator error: %s", b.config.ID, broker, err)

return res
}
Expand All @@ -613,16 +612,20 @@ func (b *Broker) handleJoinGroup(ctx *Context, r *protocol.JoinGroupRequest) *pr
res.ErrorCode = protocol.ErrUnknown.Code()
return res
}
// TODO: only try to create the group if the group is not unknown AND
// the member id is UNKNOWN, if member is specified but group does not
// exist we should reject the request
if group == nil {
// group doesn't exist so let's create it
group = &structs.Group{
Group: r.GroupID,
Coordinator: b.config.ID,
Members: make(map[string]structs.Member),
}
}
if r.MemberID == "" {
// for group member IDs -- can replace with something else
r.MemberID = uuid.NewV1().String()
r.MemberID = ctx.Header().ClientID + "-" + uuid.NewV1().String()
group.Members[r.MemberID] = structs.Member{ID: r.MemberID}
}
if group.LeaderID == "" {
Expand Down Expand Up @@ -721,7 +724,7 @@ func (b *Broker) handleSyncGroup(ctx *Context, r *protocol.SyncGroupRequest) *pr
if m, ok := group.Members[r.MemberID]; ok {
res.MemberAssignment = m.Assignment
} else {
panic("sync group: unknown member")
panic(fmt.Errorf("sync group: unknown member: %s", r.MemberID))
}
}

Expand Down Expand Up @@ -951,6 +954,7 @@ func (b *Broker) startReplica(replica *Replica) protocol.Error {
// TODO: think i need to just ensure/add the topic if it's not here yet

if topic == nil {
log.Info.Printf("broker/%d: start replica called on unknown topic: %s", b.config.ID, replica.Partition.Topic)
return protocol.ErrUnknownTopicOrPartition
}

Expand Down Expand Up @@ -978,7 +982,10 @@ func (b *Broker) createTopic(ctx *Context, topic *protocol.CreateTopicRequest) p
if t != nil {
return protocol.ErrTopicAlreadyExists
}
ps := b.buildPartitions(topic.Topic, topic.NumPartitions, topic.ReplicationFactor)
ps, err := b.buildPartitions(topic.Topic, topic.NumPartitions, topic.ReplicationFactor)
if err != protocol.ErrNone {
return err
}
tt := structs.Topic{
Topic: topic.Topic,
Partitions: make(map[int32][]int32),
Expand Down Expand Up @@ -1023,20 +1030,25 @@ func (b *Broker) createTopic(ctx *Context, topic *protocol.CreateTopicRequest) p
if err != nil {
return protocol.ErrUnknown.WithErr(err)
}
_, err = conn.LeaderAndISR(req)
res, err := conn.LeaderAndISR(req)
if err != nil {
// handle err and responses
return protocol.ErrUnknown.WithErr(err)
}
spew.Dump("leader and isr res", res)
}
}
return protocol.ErrNone
}

func (b *Broker) buildPartitions(topic string, partitionsCount int32, replicationFactor int16) []structs.Partition {
func (b *Broker) buildPartitions(topic string, partitionsCount int32, replicationFactor int16) ([]structs.Partition, protocol.Error) {
brokers := b.brokerLookup.Brokers()
count := len(brokers)

if int(replicationFactor) > count {
return nil, protocol.ErrInvalidReplicationFactor
}

// container/ring is dope af
r := ring.New(count)
for i := 0; i < r.Len(); i++ {
Expand Down Expand Up @@ -1066,7 +1078,7 @@ func (b *Broker) buildPartitions(topic string, partitionsCount int32, replicatio
partitions = append(partitions, partition)
}

return partitions
return partitions, protocol.ErrNone
}

// Leave is used to prepare for a graceful shutdown.
Expand Down Expand Up @@ -1269,10 +1281,9 @@ func (r Replica) String() string {

func (b *Broker) offsetsTopic(ctx *Context) (topic *structs.Topic, err error) {
state := b.fsm.State()
name := "__consumer_offsets"

// check if the topic exists already
_, topic, err = state.GetTopic(name)
_, topic, err = state.GetTopic(OffsetsTopicName)
if err != nil {
return
}
Expand All @@ -1281,9 +1292,13 @@ func (b *Broker) offsetsTopic(ctx *Context) (topic *structs.Topic, err error) {
}

// doesn't exist so let's create it
partitions := b.buildPartitions(name, 50, 3)
partitions, err := b.buildPartitions(OffsetsTopicName, 50, b.config.OffsetsTopicReplicationFactor)
if err != protocol.ErrNone {
return nil, err
}
topic = &structs.Topic{
Topic: "__consumer_offsets",
Topic: OffsetsTopicName,
Internal: true,
Partitions: make(map[int32][]int32),
}
for _, p := range partitions {
Expand All @@ -1292,6 +1307,11 @@ func (b *Broker) offsetsTopic(ctx *Context) (topic *structs.Topic, err error) {
_, err = b.raftApply(structs.RegisterTopicRequestType, structs.RegisterTopicRequest{
Topic: *topic,
})
for _, partition := range partitions {
if err := b.createPartition(partition); err != nil {
return nil, err
}
}
return
}

Expand Down
Loading

0 comments on commit 4d26afa

Please sign in to comment.