Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add client address to ToxicStub to enable client aware toxics #246

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions link.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ type ToxicLink struct {
input *stream.ChanWriter
output *stream.ChanReader
direction stream.Direction
clientInfo toxics.ClientInfo
}

func NewToxicLink(proxy *Proxy, collection *ToxicCollection, direction stream.Direction) *ToxicLink {
func NewToxicLink(proxy *Proxy, collection *ToxicCollection, direction stream.Direction, clientAddress string) *ToxicLink {
link := &ToxicLink{
stubs: make([]*toxics.ToxicStub, len(collection.chain[direction]), cap(collection.chain[direction])),
proxy: proxy,
toxics: collection,
direction: direction,
clientInfo: toxics.ClientInfo{Address: clientAddress},
}

// Initialize the link with ToxicStubs
Expand All @@ -45,7 +47,7 @@ func NewToxicLink(proxy *Proxy, collection *ToxicCollection, direction stream.Di
next = make(chan *stream.StreamChunk)
}

link.stubs[i] = toxics.NewToxicStub(last, next)
link.stubs[i] = toxics.NewToxicStub(last, next, link.clientInfo)
last = next
}
link.output = stream.NewChanReader(last)
Expand Down Expand Up @@ -92,7 +94,7 @@ func (link *ToxicLink) AddToxic(toxic *toxics.ToxicWrapper) {
i := len(link.stubs)

newin := make(chan *stream.StreamChunk, toxic.BufferSize)
link.stubs = append(link.stubs, toxics.NewToxicStub(newin, link.stubs[i-1].Output))
link.stubs = append(link.stubs, toxics.NewToxicStub(newin, link.stubs[i-1].Output, link.clientInfo))

// Interrupt the last toxic so that we don't have a race when moving channels
if link.stubs[i-1].InterruptToxic() {
Expand Down
14 changes: 7 additions & 7 deletions link_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestToxicsAreLoaded(t *testing.T) {

func TestStubInitializaation(t *testing.T) {
collection := NewToxicCollection(nil)
link := NewToxicLink(nil, collection, stream.Downstream)
link := NewToxicLink(nil, collection, stream.Downstream, "localhost")
if len(link.stubs) != 1 {
t.Fatalf("Link created with wrong number of stubs: %d != 1", len(link.stubs))
} else if cap(link.stubs) != toxics.Count()+1 {
Expand All @@ -31,7 +31,7 @@ func TestStubInitializaation(t *testing.T) {
}
}

func TestStubInitializaationWithToxics(t *testing.T) {
func TestStubInitializationWithToxics(t *testing.T) {
collection := NewToxicCollection(nil)
collection.chainAddToxic(&toxics.ToxicWrapper{
Toxic: new(toxics.LatencyToxic),
Expand All @@ -46,7 +46,7 @@ func TestStubInitializaationWithToxics(t *testing.T) {
Direction: stream.Downstream,
Toxicity: 1,
})
link := NewToxicLink(nil, collection, stream.Downstream)
link := NewToxicLink(nil, collection, stream.Downstream, "localhost")
if len(link.stubs) != 3 {
t.Fatalf("Link created with wrong number of stubs: %d != 3", len(link.stubs))
} else if cap(link.stubs) != toxics.Count()+1 {
Expand All @@ -63,7 +63,7 @@ func TestStubInitializaationWithToxics(t *testing.T) {

func TestAddRemoveStubs(t *testing.T) {
collection := NewToxicCollection(nil)
link := NewToxicLink(nil, collection, stream.Downstream)
link := NewToxicLink(nil, collection, stream.Downstream, "localhost")
go link.stubs[0].Run(collection.chain[stream.Downstream][0])
collection.links["test"] = link

Expand Down Expand Up @@ -106,7 +106,7 @@ func TestAddRemoveStubs(t *testing.T) {

func TestNoDataDropped(t *testing.T) {
collection := NewToxicCollection(nil)
link := NewToxicLink(nil, collection, stream.Downstream)
link := NewToxicLink(nil, collection, stream.Downstream, "localhost")
go link.stubs[0].Run(collection.chain[stream.Downstream][0])
collection.links["test"] = link

Expand Down Expand Up @@ -162,7 +162,7 @@ func TestNoDataDropped(t *testing.T) {

func TestToxicity(t *testing.T) {
collection := NewToxicCollection(nil)
link := NewToxicLink(nil, collection, stream.Downstream)
link := NewToxicLink(nil, collection, stream.Downstream, "localhost")
go link.stubs[0].Run(collection.chain[stream.Downstream][0])
collection.links["test"] = link

Expand Down Expand Up @@ -209,7 +209,7 @@ func TestToxicity(t *testing.T) {

func TestStateCreated(t *testing.T) {
collection := NewToxicCollection(nil)
link := NewToxicLink(nil, collection, stream.Downstream)
link := NewToxicLink(nil, collection, stream.Downstream, "localhost")
go link.stubs[0].Run(collection.chain[stream.Downstream][0])
collection.links["test"] = link

Expand Down
4 changes: 2 additions & 2 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ func (proxy *Proxy) server() {
proxy.connections.list[name+"upstream"] = upstream
proxy.connections.list[name+"downstream"] = client
proxy.connections.Unlock()
proxy.Toxics.StartLink(name+"upstream", client, upstream, stream.Upstream)
proxy.Toxics.StartLink(name+"downstream", upstream, client, stream.Downstream)
proxy.Toxics.StartLink(name+"upstream", client, upstream, stream.Upstream, client.RemoteAddr().String())
proxy.Toxics.StartLink(name+"downstream", upstream, client, stream.Downstream, client.RemoteAddr().String())
}
}

Expand Down
4 changes: 2 additions & 2 deletions toxic_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,11 @@ func (c *ToxicCollection) RemoveToxic(name string) error {
return ErrToxicNotFound
}

func (c *ToxicCollection) StartLink(name string, input io.Reader, output io.WriteCloser, direction stream.Direction) {
func (c *ToxicCollection) StartLink(name string, input io.Reader, output io.WriteCloser, direction stream.Direction, clientAddress string) {
c.Lock()
defer c.Unlock()

link := NewToxicLink(c.proxy, c, direction)
link := NewToxicLink(c.proxy, c, direction, clientAddress)
link.Start(name, input, output)
c.links[name] = link
}
Expand Down
8 changes: 4 additions & 4 deletions toxics/limit_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func checkRemainingChunks(t *testing.T, output chan *stream.StreamChunk) {
func check(t *testing.T, toxic *toxics.LimitDataToxic, chunks [][]byte, expectedChunks [][]byte) {
input := make(chan *stream.StreamChunk)
output := make(chan *stream.StreamChunk, 100)
stub := toxics.NewToxicStub(input, output)
stub := toxics.NewToxicStub(input, output, toxics.ClientInfo{Address: "localhost"})
stub.State = toxic.NewState()

go toxic.Pipe(stub)
Expand All @@ -54,7 +54,7 @@ func TestLimitDataToxicMayBeRestarted(t *testing.T) {

input := make(chan *stream.StreamChunk)
output := make(chan *stream.StreamChunk, 100)
stub := toxics.NewToxicStub(input, output)
stub := toxics.NewToxicStub(input, output, toxics.ClientInfo{Address:"localhost"})
stub.State = toxic.NewState()

buf := buffer(90)
Expand Down Expand Up @@ -85,7 +85,7 @@ func TestLimitDataToxicMayBeInterrupted(t *testing.T) {

input := make(chan *stream.StreamChunk)
output := make(chan *stream.StreamChunk)
stub := toxics.NewToxicStub(input, output)
stub := toxics.NewToxicStub(input, output, toxics.ClientInfo{"localhost"})
stub.State = toxic.NewState()

go func() {
Expand All @@ -100,7 +100,7 @@ func TestLimitDataToxicNilShouldClosePipe(t *testing.T) {

input := make(chan *stream.StreamChunk)
output := make(chan *stream.StreamChunk)
stub := toxics.NewToxicStub(input, output)
stub := toxics.NewToxicStub(input, output, toxics.ClientInfo{Address: "localhost"})
stub.State = toxic.NewState()

go func() {
Expand Down
4 changes: 3 additions & 1 deletion toxics/slicer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ func TestSlicerToxic(t *testing.T) {

input := make(chan *stream.StreamChunk)
output := make(chan *stream.StreamChunk)
stub := toxics.NewToxicStub(input, output)
stub := toxics.NewToxicStub(input, output, toxics.ClientInfo{
Address: "localhost",
})

done := make(chan bool)
go func() {
Expand Down
10 changes: 8 additions & 2 deletions toxics/toxic.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/Shopify/toxiproxy/stream"
)

// A Toxic is something that can be attatched to a link to modify the way
// A Toxic is something that can be attached to a link to modify the way
// data can be passed through (for example, by adding latency)
//
// Toxic
Expand Down Expand Up @@ -62,14 +62,20 @@ type ToxicStub struct {
Interrupt chan struct{}
running chan struct{}
closed chan struct{}
clientInfo ClientInfo
}

func NewToxicStub(input <-chan *stream.StreamChunk, output chan<- *stream.StreamChunk) *ToxicStub {
type ClientInfo struct {
Address string
}

func NewToxicStub(input <-chan *stream.StreamChunk, output chan<- *stream.StreamChunk, clientInfo ClientInfo) *ToxicStub {
return &ToxicStub{
Interrupt: make(chan struct{}),
closed: make(chan struct{}),
Input: input,
Output: output,
clientInfo: clientInfo,
}
}

Expand Down