From 2475d8cd97478d4e6cb650a61918d57f79e68a37 Mon Sep 17 00:00:00 2001 From: Jacob Wirth Date: Tue, 13 Sep 2016 16:04:00 -0400 Subject: [PATCH 1/7] Start adding bidirectional toxics, and http toxic --- link.go | 25 +++++++++++----- proxy.go | 7 ++--- toxic_collection.go | 72 +++++++++++++++++++++++++++++---------------- toxics/http.go | 64 ++++++++++++++++++++++++++++++++++++++++ toxics/toxic.go | 45 ++++++++++++++++++++++------ 5 files changed, 165 insertions(+), 48 deletions(-) create mode 100644 toxics/http.go diff --git a/link.go b/link.go index 7a67ae16..939f0867 100644 --- a/link.go +++ b/link.go @@ -18,12 +18,13 @@ import ( // Input > ToxicStub > ToxicStub > Output // type ToxicLink struct { - stubs []*toxics.ToxicStub - proxy *Proxy - toxics *ToxicCollection - input *stream.ChanWriter - output *stream.ChanReader - direction stream.Direction + stubs []*toxics.ToxicStub + proxy *Proxy + toxics *ToxicCollection + input *stream.ChanWriter + output *stream.ChanReader + direction stream.Direction + pairedLink *ToxicLink } func NewToxicLink(proxy *Proxy, collection *ToxicCollection, direction stream.Direction) *ToxicLink { @@ -67,7 +68,11 @@ func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser) }() for i, toxic := range link.toxics.chain[link.direction] { if stateful, ok := toxic.Toxic.(toxics.StatefulToxic); ok { - link.stubs[i].State = stateful.NewState() + if toxic.PairedToxic == nil || link.pairedLink.stubs[toxic.PairedToxic.Index].State == nil { + link.stubs[i].State = stateful.NewState() + } else { + link.stubs[i].State = link.pairedLink.stubs[toxic.PairedToxic.Index].State + } } go link.stubs[i].Run(toxic) @@ -99,7 +104,11 @@ func (link *ToxicLink) AddToxic(toxic *toxics.ToxicWrapper) { link.stubs[i-1].Output = newin if stateful, ok := toxic.Toxic.(toxics.StatefulToxic); ok { - link.stubs[i].State = stateful.NewState() + if toxic.PairedToxic == nil || link.pairedLink.stubs[toxic.PairedToxic.Index].State == nil { + link.stubs[i].State = stateful.NewState() + } else { + link.pairedLink.stubs[toxic.PairedToxic.Index].State = link.stubs[i].State + } } go link.stubs[i].Run(toxic) diff --git a/proxy.go b/proxy.go index 1d165053..f3da6b3a 100644 --- a/proxy.go +++ b/proxy.go @@ -2,13 +2,11 @@ package toxiproxy import ( "errors" + "net" "sync" - "github.com/Shopify/toxiproxy/stream" "github.com/sirupsen/logrus" tomb "gopkg.in/tomb.v1" - - "net" ) // Proxy represents the proxy in its entirity with all its links. The main @@ -176,8 +174,7 @@ func (proxy *Proxy) server() { proxy.connections.list[name+"upstream"] = upstream proxy.connections.list[name+"downstream"] = client proxy.connections.Unlock() - proxy.Toxics.StartLink(name+"upstream", client, upstream, stream.Upstream) - proxy.Toxics.StartLink(name+"downstream", upstream, client, stream.Downstream) + proxy.Toxics.StartLinks(name, client, upstream) } } diff --git a/toxic_collection.go b/toxic_collection.go index 3aded164..01f19598 100644 --- a/toxic_collection.go +++ b/toxic_collection.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "net" "strings" "sync" @@ -67,12 +68,11 @@ func (c *ToxicCollection) GetToxicArray() []toxics.Toxic { result := make([]toxics.Toxic, 0) for dir := range c.chain { - for i, toxic := range c.chain[dir] { - if i == 0 { - // Skip the first noop toxic, it should not be visible - continue + for _, toxic := range c.chain[dir] { + if len(toxic.Name) > 0 { + // Skip toxics with no name, they should be hidden + result = append(result, toxic) } - result = append(result, toxic) } } return result @@ -96,20 +96,31 @@ func (c *ToxicCollection) AddToxicJson(data io.Reader) (*toxics.ToxicWrapper, er return nil, joinError(err, ErrBadRequestBody) } - switch strings.ToLower(wrapper.Stream) { - case "downstream": - wrapper.Direction = stream.Downstream - case "upstream": - wrapper.Direction = stream.Upstream - default: - return nil, ErrInvalidStream + if toxics.New(wrapper) == nil { + return nil, ErrInvalidToxicType } - if wrapper.Name == "" { - wrapper.Name = fmt.Sprintf("%s_%s", wrapper.Type, wrapper.Stream) + + if wrapper.PairedToxic == nil { + switch strings.ToLower(wrapper.Stream) { + case "downstream": + wrapper.Direction = stream.Downstream + case "upstream": + wrapper.Direction = stream.Upstream + default: + return nil, ErrInvalidStream + } + } else { + wrapper.Stream = "both" + wrapper.Direction = stream.Downstream + wrapper.PairedToxic.Direction = stream.Upstream } - if toxics.New(wrapper) == nil { - return nil, ErrInvalidToxicType + if wrapper.Name == "" { + if wrapper.PairedToxic != nil { + wrapper.Name = wrapper.Type + } else { + wrapper.Name = fmt.Sprintf("%s_%s", wrapper.Type, wrapper.Stream) + } } found := c.findToxicByName(wrapper.Name) @@ -129,9 +140,13 @@ func (c *ToxicCollection) AddToxicJson(data io.Reader) (*toxics.ToxicWrapper, er } c.chainAddToxic(wrapper) + if wrapper.PairedToxic != nil { + c.chainAddToxic(wrapper.PairedToxic) + } return wrapper, nil } +// tODO Update bidirectional toxics func (c *ToxicCollection) UpdateToxicJson(name string, data io.Reader) (*toxics.ToxicWrapper, error) { c.Lock() defer c.Unlock() @@ -157,6 +172,7 @@ func (c *ToxicCollection) UpdateToxicJson(name string, data io.Reader) (*toxics. return nil, ErrToxicNotFound } +// TODO Remove bidirectional toxics func (c *ToxicCollection) RemoveToxic(name string) error { c.Lock() defer c.Unlock() @@ -169,13 +185,21 @@ func (c *ToxicCollection) RemoveToxic(name string) error { return ErrToxicNotFound } -func (c *ToxicCollection) StartLink(name string, input io.Reader, output io.WriteCloser, direction stream.Direction) { +func (c *ToxicCollection) StartLinks(name string, client, upstream net.Conn) { c.Lock() defer c.Unlock() - link := NewToxicLink(c.proxy, c, direction) - link.Start(name, input, output) - c.links[name] = link + linkUp := NewToxicLink(c.proxy, c, stream.Upstream) + linkDown := NewToxicLink(c.proxy, c, stream.Downstream) + + linkUp.pairedLink = linkDown + linkDown.pairedLink = linkUp + + linkUp.Start(name+"upstream", client, upstream) + linkDown.Start(name+"downstream", upstream, client) + + c.links[name+"upstream"] = linkUp + c.links[name+"downstream"] = linkDown } func (c *ToxicCollection) RemoveLink(name string) { @@ -187,12 +211,8 @@ func (c *ToxicCollection) RemoveLink(name string) { // All following functions assume the lock is already grabbed func (c *ToxicCollection) findToxicByName(name string) *toxics.ToxicWrapper { for dir := range c.chain { - for i, toxic := range c.chain[dir] { - if i == 0 { - // Skip the first noop toxic, it has no name - continue - } - if toxic.Name == name { + for _, toxic := range c.chain[dir] { + if len(toxic.Name) > 0 && toxic.Name == name { return toxic } } diff --git a/toxics/http.go b/toxics/http.go new file mode 100644 index 00000000..abf66252 --- /dev/null +++ b/toxics/http.go @@ -0,0 +1,64 @@ +package toxics + +import ( + "bufio" + "bytes" + "fmt" + "io" + "net/http" + + "github.com/Shopify/toxiproxy/stream" +) + +type HttpToxic struct{} + +type HttpToxicState struct { + Shared bool +} + +func (t *HttpToxic) ModifyResponse(resp *http.Response) { + resp.Header.Set("Location", "https://github.com/Shopify/toxiproxy") +} + +func (t *HttpToxic) PipeRequest(stub *ToxicStub) { + state := stub.State.(*HttpToxicState) + state.Shared = true + // TODO + new(NoopToxic).Pipe(stub) +} + +func (t *HttpToxic) Pipe(stub *ToxicStub) { + state := stub.State.(*HttpToxicState) + + buffer := bytes.NewBuffer(make([]byte, 0, 32*1024)) + writer := stream.NewChanWriter(stub.Output) + reader := stream.NewChanReader(stub.Input) + reader.SetInterrupt(stub.Interrupt) + for { + tee := io.TeeReader(reader, buffer) + resp, err := http.ReadResponse(bufio.NewReader(tee), nil) + if err == stream.ErrInterrupted { + buffer.WriteTo(writer) + return + } else if err == io.EOF || err == io.ErrUnexpectedEOF { + stub.Close() + return + } + if err != nil { + buffer.WriteTo(writer) + } else { + fmt.Println("Shared:", state.Shared) // This should be true if the shared state is working + t.ModifyResponse(resp) + resp.Write(writer) + } + buffer.Reset() + } +} + +func (t *HttpToxic) NewState() interface{} { + return new(HttpToxicState) +} + +func init() { + Register("http", new(HttpToxic)) +} diff --git a/toxics/toxic.go b/toxics/toxic.go index bccd5f88..68764ebc 100644 --- a/toxics/toxic.go +++ b/toxics/toxic.go @@ -31,6 +31,7 @@ type CleanupToxic interface { Cleanup(*ToxicStub) } +// Buffered toxics include a buffer for incoming data so that delays don't block other toxics. type BufferedToxic interface { // Defines the size of buffer this toxic should use GetBufferSize() int @@ -44,15 +45,26 @@ type StatefulToxic interface { NewState() interface{} } +// TODO Test everything about bidirectional toxics +// - Toxicity +// - Duplicate names +// - Hidden toxic pair +// - Add/Update/Remove +// - State +type BidirectionalToxic interface { + PipeRequest(*ToxicStub) +} + type ToxicWrapper struct { - Toxic `json:"attributes"` - Name string `json:"name"` - Type string `json:"type"` - Stream string `json:"stream"` - Toxicity float32 `json:"toxicity"` - Direction stream.Direction `json:"-"` - Index int `json:"-"` - BufferSize int `json:"-"` + Toxic `json:"attributes"` + Name string `json:"name"` + Type string `json:"type"` + Stream string `json:"stream"` + Toxicity float32 `json:"toxicity"` + Direction stream.Direction `json:"-"` + Index int `json:"-"` + BufferSize int `json:"-"` + PairedToxic *ToxicWrapper `json:"-"` } type ToxicStub struct { @@ -78,8 +90,14 @@ func NewToxicStub(input <-chan *stream.StreamChunk, output chan<- *stream.Stream func (s *ToxicStub) Run(toxic *ToxicWrapper) { s.running = make(chan struct{}) defer close(s.running) + // TODO: How will toxicity work with bidirectional toxics? if rand.Float32() < toxic.Toxicity { - toxic.Pipe(s) + if toxic.PairedToxic == nil || toxic.Direction == stream.Downstream { + toxic.Pipe(s) + } else { + bidirectional := toxic.Toxic.(BidirectionalToxic) + bidirectional.PipeRequest(s) + } } else { new(NoopToxic).Pipe(s) } @@ -140,6 +158,15 @@ func New(wrapper *ToxicWrapper) Toxic { } else { wrapper.BufferSize = 0 } + if _, ok := wrapper.Toxic.(BidirectionalToxic); ok { + wrapper.PairedToxic = &ToxicWrapper{ + Toxic: wrapper.Toxic, + Type: wrapper.Type, + Toxicity: 1, // TODO how will toxicity work? + BufferSize: wrapper.BufferSize, + PairedToxic: wrapper, + } + } return wrapper.Toxic } From 2989b17636e079e81e6bb699cf57c926955ce472 Mon Sep 17 00:00:00 2001 From: Jacob Wirth Date: Tue, 13 Sep 2016 18:11:39 -0400 Subject: [PATCH 2/7] Finish some TODO's and add request parsing for http toxic --- link.go | 1 + toxic_collection.go | 8 ++++++-- toxics/http.go | 43 +++++++++++++++++++++++++++++++++++-------- toxics/toxic.go | 6 ++++-- 4 files changed, 46 insertions(+), 12 deletions(-) diff --git a/link.go b/link.go index 939f0867..26e770cc 100644 --- a/link.go +++ b/link.go @@ -72,6 +72,7 @@ func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser) link.stubs[i].State = stateful.NewState() } else { link.stubs[i].State = link.pairedLink.stubs[toxic.PairedToxic.Index].State + link.stubs[i].Toxicity = link.pairedLink.stubs[toxic.PairedToxic.Index].Toxicity } } diff --git a/toxic_collection.go b/toxic_collection.go index 01f19598..7cae0a46 100644 --- a/toxic_collection.go +++ b/toxic_collection.go @@ -146,7 +146,6 @@ func (c *ToxicCollection) AddToxicJson(data io.Reader) (*toxics.ToxicWrapper, er return wrapper, nil } -// tODO Update bidirectional toxics func (c *ToxicCollection) UpdateToxicJson(name string, data io.Reader) (*toxics.ToxicWrapper, error) { c.Lock() defer c.Unlock() @@ -166,19 +165,24 @@ func (c *ToxicCollection) UpdateToxicJson(name string, data io.Reader) (*toxics. } toxic.Toxicity = attrs.Toxicity + if toxic.PairedToxic != nil { + c.chainUpdateToxic(toxic.PairedToxic) + } c.chainUpdateToxic(toxic) return toxic, nil } return nil, ErrToxicNotFound } -// TODO Remove bidirectional toxics func (c *ToxicCollection) RemoveToxic(name string) error { c.Lock() defer c.Unlock() toxic := c.findToxicByName(name) if toxic != nil { + if toxic.PairedToxic != nil { + c.chainRemoveToxic(toxic.PairedToxic) + } c.chainRemoveToxic(toxic) return nil } diff --git a/toxics/http.go b/toxics/http.go index abf66252..38773927 100644 --- a/toxics/http.go +++ b/toxics/http.go @@ -3,7 +3,6 @@ package toxics import ( "bufio" "bytes" - "fmt" "io" "net/http" @@ -13,7 +12,11 @@ import ( type HttpToxic struct{} type HttpToxicState struct { - Shared bool + Requests chan *http.Request +} + +func (t *HttpToxic) FilterRequests(req *http.Request) bool { + return req.URL.Path == "/foo" } func (t *HttpToxic) ModifyResponse(resp *http.Response) { @@ -22,9 +25,29 @@ func (t *HttpToxic) ModifyResponse(resp *http.Response) { func (t *HttpToxic) PipeRequest(stub *ToxicStub) { state := stub.State.(*HttpToxicState) - state.Shared = true - // TODO - new(NoopToxic).Pipe(stub) + + buffer := bytes.NewBuffer(make([]byte, 0, 32*1024)) + writer := stream.NewChanWriter(stub.Output) + reader := stream.NewChanReader(stub.Input) + reader.SetInterrupt(stub.Interrupt) + for { + tee := io.TeeReader(reader, buffer) + req, err := http.ReadRequest(bufio.NewReader(tee)) + if err == stream.ErrInterrupted { + buffer.WriteTo(writer) + return + } else if err == io.EOF || err == io.ErrUnexpectedEOF { + stub.Close() + return + } + if err != nil { + buffer.WriteTo(writer) + } else { + state.Requests <- req + req.Write(writer) + } + buffer.Reset() + } } func (t *HttpToxic) Pipe(stub *ToxicStub) { @@ -37,6 +60,7 @@ func (t *HttpToxic) Pipe(stub *ToxicStub) { for { tee := io.TeeReader(reader, buffer) resp, err := http.ReadResponse(bufio.NewReader(tee), nil) + req := <-state.Requests if err == stream.ErrInterrupted { buffer.WriteTo(writer) return @@ -47,8 +71,9 @@ func (t *HttpToxic) Pipe(stub *ToxicStub) { if err != nil { buffer.WriteTo(writer) } else { - fmt.Println("Shared:", state.Shared) // This should be true if the shared state is working - t.ModifyResponse(resp) + if t.FilterRequests(req) { + t.ModifyResponse(resp) + } resp.Write(writer) } buffer.Reset() @@ -56,7 +81,9 @@ func (t *HttpToxic) Pipe(stub *ToxicStub) { } func (t *HttpToxic) NewState() interface{} { - return new(HttpToxicState) + return &HttpToxicState{ + Requests: make(chan *http.Request, 1), + } } func init() { diff --git a/toxics/toxic.go b/toxics/toxic.go index 68764ebc..6e4632a7 100644 --- a/toxics/toxic.go +++ b/toxics/toxic.go @@ -71,6 +71,7 @@ type ToxicStub struct { Input <-chan *stream.StreamChunk Output chan<- *stream.StreamChunk State interface{} + Toxicity float32 Interrupt chan struct{} running chan struct{} closed chan struct{} @@ -82,6 +83,7 @@ func NewToxicStub(input <-chan *stream.StreamChunk, output chan<- *stream.Stream closed: make(chan struct{}), Input: input, Output: output, + Toxicity: rand.Float32(), } } @@ -90,8 +92,8 @@ func NewToxicStub(input <-chan *stream.StreamChunk, output chan<- *stream.Stream func (s *ToxicStub) Run(toxic *ToxicWrapper) { s.running = make(chan struct{}) defer close(s.running) - // TODO: How will toxicity work with bidirectional toxics? - if rand.Float32() < toxic.Toxicity { + + if s.Toxicity < toxic.Toxicity { if toxic.PairedToxic == nil || toxic.Direction == stream.Downstream { toxic.Pipe(s) } else { From e82d648e16ddd400f05cbc5e408f225c115ec5ce Mon Sep 17 00:00:00 2001 From: Jacob Wirth Date: Wed, 14 Sep 2016 15:42:32 -0400 Subject: [PATCH 3/7] Add bidirectional toxic tests --- api.go | 2 +- toxic_collection.go | 4 +- toxics/bidirectional_test.go | 211 +++++++++++++++++++++++++++++++++++ toxics/toxic.go | 11 +- 4 files changed, 218 insertions(+), 10 deletions(-) create mode 100644 toxics/bidirectional_test.go diff --git a/api.go b/api.go index 02edd9aa..c4aa400f 100644 --- a/api.go +++ b/api.go @@ -443,7 +443,7 @@ func apiError(resp http.ResponseWriter, err error) bool { type proxyToxics struct { *Proxy - Toxics []toxics.Toxic `json:"toxics"` + Toxics []*toxics.ToxicWrapper `json:"toxics"` } func proxyWithToxics(proxy *Proxy) (result proxyToxics) { diff --git a/toxic_collection.go b/toxic_collection.go index 7cae0a46..b2619c75 100644 --- a/toxic_collection.go +++ b/toxic_collection.go @@ -62,11 +62,11 @@ func (c *ToxicCollection) GetToxic(name string) *toxics.ToxicWrapper { return c.findToxicByName(name) } -func (c *ToxicCollection) GetToxicArray() []toxics.Toxic { +func (c *ToxicCollection) GetToxicArray() []*toxics.ToxicWrapper { c.Lock() defer c.Unlock() - result := make([]toxics.Toxic, 0) + result := make([]*toxics.ToxicWrapper, 0) for dir := range c.chain { for _, toxic := range c.chain[dir] { if len(toxic.Name) > 0 { diff --git a/toxics/bidirectional_test.go b/toxics/bidirectional_test.go new file mode 100644 index 00000000..a2126af0 --- /dev/null +++ b/toxics/bidirectional_test.go @@ -0,0 +1,211 @@ +package toxics_test + +import ( + "bufio" + "bytes" + "math/rand" + "net" + "strconv" + "testing" + + "github.com/Shopify/toxiproxy" + "github.com/Shopify/toxiproxy/stream" + "github.com/Shopify/toxiproxy/toxics" +) + +type EchoToxic struct { + Replace bool `json:"replace"` +} + +type EchoToxicState struct { + Request chan *stream.StreamChunk +} + +func (t *EchoToxic) PipeRequest(stub *toxics.ToxicStub) { + state := stub.State.(*EchoToxicState) + + for { + select { + case <-stub.Interrupt: + return + case c := <-stub.Input: + if c == nil { + close(state.Request) + stub.Close() + return + } + state.Request <- c + } + } +} + +func (t *EchoToxic) Pipe(stub *toxics.ToxicStub) { + state := stub.State.(*EchoToxicState) + + for { + select { + case <-stub.Interrupt: + return + case c := <-state.Request: + if c == nil { + stub.Close() + return + } + if t.Replace { + c.Data = []byte("foobar\n") + } + stub.Output <- c + } + } +} + +func (t *EchoToxic) NewState() interface{} { + return &EchoToxicState{ + Request: make(chan *stream.StreamChunk), + } +} + +func init() { + toxics.Register("echo_test", new(EchoToxic)) +} + +func WithEchoToxic(t *testing.T, existingLink bool, f func(proxy net.Conn, response chan []byte, proxyServer *toxiproxy.Proxy)) { + WithEchoServer(t, func(upstream string, response chan []byte) { + proxy := NewTestProxy("test", upstream) + proxy.Start() + + // Test the 2 different code paths for adding toxics to existing links vs adding them on link creation. + if !existingLink { + proxy.Toxics.AddToxicJson(ToxicToJson(t, "", "echo_test", "both", &EchoToxic{})) + } + + conn, err := net.Dial("tcp", proxy.Listen) + if err != nil { + t.Error("Unable to dial TCP server", err) + } + + if existingLink { + proxy.Toxics.AddToxicJson(ToxicToJson(t, "", "echo_test", "both", &EchoToxic{})) + } + + f(conn, response, proxy) + + proxy.Stop() + }) +} + +func AssertToxicEchoResponse(t *testing.T, proxy net.Conn, serverResponse chan []byte, expectServer bool) { + msg := []byte("hello " + strconv.Itoa(rand.Int()) + " world\n") + + _, err := proxy.Write(msg) + if err != nil { + t.Error("Failed writing to TCP server", err) + } + + scan := bufio.NewScanner(proxy) + if !scan.Scan() { + t.Error("Server unexpectedly closed connection") + } + + resp := append(scan.Bytes(), '\n') + if !bytes.Equal(resp, msg) { + t.Error("Client didn't read correct bytes from proxy:", string(resp), "!=", string(msg)) + } + + if expectServer { + resp = <-serverResponse + if !bytes.Equal(resp, msg) { + t.Error("Server didn't read correct bytes from client:", string(resp), "!=", string(msg)) + } + } else { + select { + case resp = <-serverResponse: + t.Error("Server got unexpected data from client:", string(resp)) + default: + } + } +} + +func TestAddUpdateRemoveBidirectionalToxic(t *testing.T) { + for existing := 0; existing < 2; existing++ { + WithEchoToxic(t, existing > 0, func(proxy net.Conn, response chan []byte, proxyServer *toxiproxy.Proxy) { + AssertToxicEchoResponse(t, proxy, response, false) + + proxyServer.Toxics.UpdateToxicJson("echo_test", bytes.NewReader([]byte(`{"attributes": {"replace": true}}`))) + + _, err := proxy.Write([]byte("hello world\n")) + if err != nil { + t.Error("Failed writing to TCP server", err) + } + + scan := bufio.NewScanner(proxy) + if !scan.Scan() { + t.Error("Server unexpectedly closed connection") + } + + resp := scan.Bytes() + if !bytes.Equal(resp, []byte("foobar")) { + t.Error("Client didn't read correct bytes from proxy:", string(resp), "!= foobar") + } + + proxyServer.Toxics.RemoveToxic("echo_test") + + AssertToxicEchoResponse(t, proxy, response, true) + }) + } +} + +func TestBidirectionalToxicOnlyShowsUpOnce(t *testing.T) { + proxy := NewTestProxy("test", "localhost:20001") + proxy.Start() + + toxic, _ := proxy.Toxics.AddToxicJson(ToxicToJson(t, "", "echo_test", "both", &EchoToxic{})) + if toxic.PairedToxic == nil { + t.Fatal("Expected bidirectional toxic to have a paired toxic.") + } else if toxic.PairedToxic.Name != "" || toxic.PairedToxic.Direction != stream.Upstream || toxic.PairedToxic.PairedToxic != toxic { + t.Fatalf("Paired toxic had incorrect values: %+v", toxic.PairedToxic) + } else if toxic.Direction != stream.Downstream { + t.Fatal("Expected toxic to have downstream direction set:", toxic.Direction) + } + + toxics := proxy.Toxics.GetToxicArray() + if len(toxics) != 1 { + t.Fatalf("Wrong number of toxics returned: %d != 1", len(toxics)) + } else if toxics[0].Name != "echo_test" || toxics[0].Stream != "both" { + t.Fatalf("Toxic was not stored as expected: %+v", toxics[0]) + } + + proxy.Stop() +} + +func TestBidirectionalToxicDuplicateName(t *testing.T) { + proxy := NewTestProxy("test", "localhost:20001") + proxy.Start() + + // Test against regular toxics + proxy.Toxics.AddToxicJson(ToxicToJson(t, "foo", "latency", "downstream", &toxics.LatencyToxic{})) + _, err := proxy.Toxics.AddToxicJson(ToxicToJson(t, "foo", "echo_test", "both", &EchoToxic{})) + if err != toxiproxy.ErrToxicAlreadyExists { + t.Fatal("Expected adding toxic to fail due to existing toxic:", err) + } + + // Test against other bidirection toxics + proxy.Toxics.AddToxicJson(ToxicToJson(t, "bar", "echo_test", "both", &EchoToxic{})) + _, err = proxy.Toxics.AddToxicJson(ToxicToJson(t, "bar", "echo_test", "both", &EchoToxic{})) + if err != toxiproxy.ErrToxicAlreadyExists { + t.Fatal("Expected adding toxic to fail due to existing bidirectional toxic:", err) + } + + toxics := proxy.Toxics.GetToxicArray() + if len(toxics) != 2 { + t.Fatalf("Wrong number of toxics returned: %d != 2", len(toxics)) + } else if toxics[0].Name != "foo" || toxics[0].Type != "latency" || toxics[0].Stream != "downstream" { + t.Fatalf("Latency toxic was not stored as expected: %+v", toxics[0]) + } else if toxics[1].Name != "bar" || toxics[1].Type != "echo_test" || toxics[1].Stream != "both" { + t.Fatalf("Bidirectional toxic was not stored as expected: %+v", toxics[1]) + } + + proxy.Stop() +} + +// TODO: Test toxicity on bidirectional toxics diff --git a/toxics/toxic.go b/toxics/toxic.go index 6e4632a7..c6102a20 100644 --- a/toxics/toxic.go +++ b/toxics/toxic.go @@ -45,13 +45,10 @@ type StatefulToxic interface { NewState() interface{} } -// TODO Test everything about bidirectional toxics -// - Toxicity -// - Duplicate names -// - Hidden toxic pair -// - Add/Update/Remove -// - State +// Bidirectional toxics operate on both TCP streams and allow state to be shared. +// PipeRequest() will oparate on the upstream, while Pipe() will operate on the downstream. type BidirectionalToxic interface { + // Defines the packet flow through an upstream ToxicStub. Operates the same as Pipe(). PipeRequest(*ToxicStub) } @@ -164,7 +161,7 @@ func New(wrapper *ToxicWrapper) Toxic { wrapper.PairedToxic = &ToxicWrapper{ Toxic: wrapper.Toxic, Type: wrapper.Type, - Toxicity: 1, // TODO how will toxicity work? + Toxicity: wrapper.Toxicity, BufferSize: wrapper.BufferSize, PairedToxic: wrapper, } From 2dc68309af35849c724fa62934c6bca221eeea07 Mon Sep 17 00:00:00 2001 From: Jacob Wirth Date: Wed, 14 Sep 2016 16:32:37 -0400 Subject: [PATCH 4/7] Test Toxicity and fix updating toxicity on bidirectional toxic --- link.go | 3 +- toxic_collection.go | 1 + toxics/bidirectional_test.go | 79 +++++++++++++++++++++++++++++++++++- toxics/toxic_test.go | 2 +- 4 files changed, 81 insertions(+), 4 deletions(-) diff --git a/link.go b/link.go index 26e770cc..5ab08b8c 100644 --- a/link.go +++ b/link.go @@ -108,7 +108,8 @@ func (link *ToxicLink) AddToxic(toxic *toxics.ToxicWrapper) { if toxic.PairedToxic == nil || link.pairedLink.stubs[toxic.PairedToxic.Index].State == nil { link.stubs[i].State = stateful.NewState() } else { - link.pairedLink.stubs[toxic.PairedToxic.Index].State = link.stubs[i].State + link.stubs[i].State = link.pairedLink.stubs[toxic.PairedToxic.Index].State + link.stubs[i].Toxicity = link.pairedLink.stubs[toxic.PairedToxic.Index].Toxicity } } diff --git a/toxic_collection.go b/toxic_collection.go index b2619c75..486ca70d 100644 --- a/toxic_collection.go +++ b/toxic_collection.go @@ -166,6 +166,7 @@ func (c *ToxicCollection) UpdateToxicJson(name string, data io.Reader) (*toxics. toxic.Toxicity = attrs.Toxicity if toxic.PairedToxic != nil { + toxic.PairedToxic.Toxicity = attrs.Toxicity c.chainUpdateToxic(toxic.PairedToxic) } c.chainUpdateToxic(toxic) diff --git a/toxics/bidirectional_test.go b/toxics/bidirectional_test.go index a2126af0..37105874 100644 --- a/toxics/bidirectional_test.go +++ b/toxics/bidirectional_test.go @@ -3,6 +3,7 @@ package toxics_test import ( "bufio" "bytes" + "fmt" "math/rand" "net" "strconv" @@ -18,11 +19,13 @@ type EchoToxic struct { } type EchoToxicState struct { - Request chan *stream.StreamChunk + Request chan *stream.StreamChunk + UpstreamToxicity float32 } func (t *EchoToxic) PipeRequest(stub *toxics.ToxicStub) { state := stub.State.(*EchoToxicState) + state.UpstreamToxicity = stub.Toxicity for { select { @@ -54,6 +57,9 @@ func (t *EchoToxic) Pipe(stub *toxics.ToxicStub) { if t.Replace { c.Data = []byte("foobar\n") } + if stub.Toxicity != state.UpstreamToxicity { + c.Data = []byte(fmt.Sprintf("Incorrect toxicity: %f != %f\n", stub.Toxicity, state.UpstreamToxicity)) + } stub.Output <- c } } @@ -208,4 +214,73 @@ func TestBidirectionalToxicDuplicateName(t *testing.T) { proxy.Stop() } -// TODO: Test toxicity on bidirectional toxics +func TestBidirectionalToxicWithStartToxicity(t *testing.T) { + for existing := 0; existing < 2; existing++ { + WithEchoToxic(t, existing > 0, func(proxy net.Conn, response chan []byte, proxyServer *toxiproxy.Proxy) { + proxyServer.Toxics.RemoveToxic("echo_test") + AssertToxicEchoResponse(t, proxy, response, true) + + proxyServer.Toxics.AddToxicJson(bytes.NewReader([]byte(`{"type": "echo_test", "toxicity": 0.5}`))) + + _, err := proxy.Write([]byte("hello world\n")) + if err != nil { + t.Error("Failed writing to TCP server", err) + } + + scan := bufio.NewScanner(proxy) + if !scan.Scan() { + t.Error("Server unexpectedly closed connection") + } + + resp := scan.Bytes() + if !bytes.Equal(resp, []byte("hello world")) { + t.Error("Client didn't read correct bytes from proxy:", string(resp), "!= hello world") + } + + // Clear out response, it's random if the request made it through or not + select { + case <-response: + default: + } + + proxyServer.Toxics.RemoveToxic("echo_test") + + AssertToxicEchoResponse(t, proxy, response, true) + }) + } +} + +func TestBidirectionalToxicWithUpdatedToxicity(t *testing.T) { + for existing := 0; existing < 2; existing++ { + WithEchoToxic(t, existing > 0, func(proxy net.Conn, response chan []byte, proxyServer *toxiproxy.Proxy) { + AssertToxicEchoResponse(t, proxy, response, false) + + proxyServer.Toxics.UpdateToxicJson("echo_test", bytes.NewReader([]byte(`{"toxicity": 0.5}`))) + + _, err := proxy.Write([]byte("hello world\n")) + if err != nil { + t.Error("Failed writing to TCP server", err) + } + + scan := bufio.NewScanner(proxy) + if !scan.Scan() { + t.Error("Server unexpectedly closed connection") + } + + resp := scan.Bytes() + if !bytes.Equal(resp, []byte("hello world")) { + t.Error("Client didn't read correct bytes from proxy:", string(resp), "!= hello world") + } + + // Clear out response, it's random if the request made it through or not + select { + case <-response: + default: + } + + proxyServer.Toxics.RemoveToxic("echo_test") + + AssertToxicEchoResponse(t, proxy, response, true) + }) + } +} diff --git a/toxics/toxic_test.go b/toxics/toxic_test.go index 2f5d87d6..d720fe48 100644 --- a/toxics/toxic_test.go +++ b/toxics/toxic_test.go @@ -56,7 +56,7 @@ func WithEchoServer(t *testing.T, f func(string, chan []byte)) { ln.Close() scan := bufio.NewScanner(src) - if scan.Scan() { + for scan.Scan() { received := append(scan.Bytes(), '\n') response <- received From ec964a07e91a229ac8a636a63c2df1f9ad623de8 Mon Sep 17 00:00:00 2001 From: Jacob Wirth Date: Wed, 14 Sep 2016 16:34:42 -0400 Subject: [PATCH 5/7] Remove http toxic, it will be in another PR --- toxics/http.go | 91 -------------------------------------------------- 1 file changed, 91 deletions(-) delete mode 100644 toxics/http.go diff --git a/toxics/http.go b/toxics/http.go deleted file mode 100644 index 38773927..00000000 --- a/toxics/http.go +++ /dev/null @@ -1,91 +0,0 @@ -package toxics - -import ( - "bufio" - "bytes" - "io" - "net/http" - - "github.com/Shopify/toxiproxy/stream" -) - -type HttpToxic struct{} - -type HttpToxicState struct { - Requests chan *http.Request -} - -func (t *HttpToxic) FilterRequests(req *http.Request) bool { - return req.URL.Path == "/foo" -} - -func (t *HttpToxic) ModifyResponse(resp *http.Response) { - resp.Header.Set("Location", "https://github.com/Shopify/toxiproxy") -} - -func (t *HttpToxic) PipeRequest(stub *ToxicStub) { - state := stub.State.(*HttpToxicState) - - buffer := bytes.NewBuffer(make([]byte, 0, 32*1024)) - writer := stream.NewChanWriter(stub.Output) - reader := stream.NewChanReader(stub.Input) - reader.SetInterrupt(stub.Interrupt) - for { - tee := io.TeeReader(reader, buffer) - req, err := http.ReadRequest(bufio.NewReader(tee)) - if err == stream.ErrInterrupted { - buffer.WriteTo(writer) - return - } else if err == io.EOF || err == io.ErrUnexpectedEOF { - stub.Close() - return - } - if err != nil { - buffer.WriteTo(writer) - } else { - state.Requests <- req - req.Write(writer) - } - buffer.Reset() - } -} - -func (t *HttpToxic) Pipe(stub *ToxicStub) { - state := stub.State.(*HttpToxicState) - - buffer := bytes.NewBuffer(make([]byte, 0, 32*1024)) - writer := stream.NewChanWriter(stub.Output) - reader := stream.NewChanReader(stub.Input) - reader.SetInterrupt(stub.Interrupt) - for { - tee := io.TeeReader(reader, buffer) - resp, err := http.ReadResponse(bufio.NewReader(tee), nil) - req := <-state.Requests - if err == stream.ErrInterrupted { - buffer.WriteTo(writer) - return - } else if err == io.EOF || err == io.ErrUnexpectedEOF { - stub.Close() - return - } - if err != nil { - buffer.WriteTo(writer) - } else { - if t.FilterRequests(req) { - t.ModifyResponse(resp) - } - resp.Write(writer) - } - buffer.Reset() - } -} - -func (t *HttpToxic) NewState() interface{} { - return &HttpToxicState{ - Requests: make(chan *http.Request, 1), - } -} - -func init() { - Register("http", new(HttpToxic)) -} From c75bec46cc2a33d473ff768484fc4781cdb76d79 Mon Sep 17 00:00:00 2001 From: Jacob Wirth Date: Wed, 14 Sep 2016 17:21:59 -0400 Subject: [PATCH 6/7] Add bidirectional toxic info to the docs --- CHANGELOG.md | 4 +++ CREATING_TOXICS.md | 87 ++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 88 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 24125e08..fd5ecc33 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# 2.2.0 (unreleased) + +* Add bidirectional toxics #132 + # 2.1.3 * Update `/version` endpoint to also return a charset of utf-8. #204 diff --git a/CREATING_TOXICS.md b/CREATING_TOXICS.md index ab39bc83..ea186a7e 100644 --- a/CREATING_TOXICS.md +++ b/CREATING_TOXICS.md @@ -118,9 +118,18 @@ and how much memory you are comfortable with using. ## Stateful toxics If a toxic needs to store extra information for a connection such as the number of bytes -transferred (See `limit_data` toxic), a state object can be created by implementing the -`StatefulToxic` interface. This interface defines the `NewState()` function that can create -a new state object with default values set. +transferred (See the [limit_data toxic](https://github.com/Shopify/toxiproxy/blob/master/toxics/limit_data.go)), +a state object can be created by implementing the `StatefulToxic` interface. This interface +defines the `NewState()` function that can create a new state object with default values set. + +```go +func (t *ExampleToxic) NewState() interface{} { + return &ExampleToxicState{ + BytesRemaining: t.BytesAllowed, + SomeOtherState: true, + } +} +``` When a stateful toxic is created, the state object will be stored on the `ToxicStub` and can be accessed from `toxic.Pipe()`: @@ -133,6 +142,78 @@ If necessary, some global state can be stored in the toxic struct, which will no instanced per-connection. These fields cannot have a custom default value set and will not be thread-safe, so proper locking or atomic operations will need to be used. +## Bidirectional toxics + +Regular toxics are limited to data flowing in a single direction, so they can't make decisions +for the `downstream` based on a request in the `upstream`. For things like protocol aware toxics +this is a problem. + +Bidirectional toxics allow state to be shared for the `upstream` and `downstream` pipes in a single +toxic implementation. They also ensure direction-specific code is always run on the correct pipe +(a toxic that only works on the `upstream` can't be added to the `downstream`). + +Creating a bidirectional toxic is done by implementing a second `Pipe()` function called `PipeRequest()`. +The implementation is same as a regular toxic, and can be paired with other types such as a stateful toxic. + +One use case of a bidirectional toxic is to mock out the backend server entirely, which is shown below: + +```go +type EchoToxic struct {} + +type EchoToxicState struct { + Request chan *stream.StreamChunk +} + +// PipeRequest handles the upstream direction +func (t *EchoToxic) PipeRequest(stub *toxics.ToxicStub) { + state := stub.State.(*EchoToxicState) + + for { + select { + case <-stub.Interrupt: + return + case c := <-stub.Input: + if c == nil { + // Close the downstream when the client closes + close(state.Request) + stub.Close() + return + } + // Send the data to the downstream through the state object + state.Request <- c + } + } +} + +// Pipe() will only handle the downstream on a bidirectional toxic +func (t *EchoToxic) Pipe(stub *toxics.ToxicStub) { + state := stub.State.(*EchoToxicState) + + for { + select { + case <-stub.Interrupt: + return + case c := <-state.Request: // Read from the upstream instead of the server + if c == nil { + stub.Close() + return + } + stub.Output <- c + } + } +} + +func (t *EchoToxic) NewState() interface{} { + return &EchoToxicState{ + Request: make(chan *stream.StreamChunk), + } +} +``` + +This example will loop back all data send to the server back to the client. Another use case seen +within toxiproxy is to filter http response modifications based on the request URL (See the +[http toxic](https://github.com/Shopify/toxiproxy/tree/master/toxics/http.go)). + ## Using `io.Reader` and `io.Writer` If your toxic involves modifying the data going through a proxy, you can use the `ChanReader` From 0e85ee68c99741f7c1bc31af68d2f4d0ae61faaf Mon Sep 17 00:00:00 2001 From: Jacob Wirth Date: Tue, 20 Sep 2016 15:24:15 -0400 Subject: [PATCH 7/7] Clean up code and rename PipeRequest to PipeUpstream --- CHANGELOG.md | 2 +- CREATING_TOXICS.md | 6 +++--- link.go | 34 ++++++++++++++++++---------------- toxics/bidirectional_test.go | 2 +- toxics/toxic.go | 6 +++--- 5 files changed, 26 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fd5ecc33..0403fdb9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # 2.2.0 (unreleased) -* Add bidirectional toxics #132 +* Add support for bidirectional toxics (for protocol-aware toxics) #132 # 2.1.3 diff --git a/CREATING_TOXICS.md b/CREATING_TOXICS.md index ea186a7e..c5b0d355 100644 --- a/CREATING_TOXICS.md +++ b/CREATING_TOXICS.md @@ -152,7 +152,7 @@ Bidirectional toxics allow state to be shared for the `upstream` and `downstream toxic implementation. They also ensure direction-specific code is always run on the correct pipe (a toxic that only works on the `upstream` can't be added to the `downstream`). -Creating a bidirectional toxic is done by implementing a second `Pipe()` function called `PipeRequest()`. +Creating a bidirectional toxic is done by implementing a second `Pipe()` function called `PipeUpstream()`. The implementation is same as a regular toxic, and can be paired with other types such as a stateful toxic. One use case of a bidirectional toxic is to mock out the backend server entirely, which is shown below: @@ -164,8 +164,8 @@ type EchoToxicState struct { Request chan *stream.StreamChunk } -// PipeRequest handles the upstream direction -func (t *EchoToxic) PipeRequest(stub *toxics.ToxicStub) { +// PipeUpstream handles the upstream direction +func (t *EchoToxic) PipeUpstream(stub *toxics.ToxicStub) { state := stub.State.(*EchoToxicState) for { diff --git a/link.go b/link.go index 5ab08b8c..f110f0fb 100644 --- a/link.go +++ b/link.go @@ -67,14 +67,7 @@ func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser) link.input.Close() }() for i, toxic := range link.toxics.chain[link.direction] { - if stateful, ok := toxic.Toxic.(toxics.StatefulToxic); ok { - if toxic.PairedToxic == nil || link.pairedLink.stubs[toxic.PairedToxic.Index].State == nil { - link.stubs[i].State = stateful.NewState() - } else { - link.stubs[i].State = link.pairedLink.stubs[toxic.PairedToxic.Index].State - link.stubs[i].Toxicity = link.pairedLink.stubs[toxic.PairedToxic.Index].Toxicity - } - } + link.InitPairState(toxic) go link.stubs[i].Run(toxic) } @@ -93,6 +86,22 @@ func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser) }() } +func (link *ToxicLink) InitPairState(toxic *toxics.ToxicWrapper) { + // If the toxic is stateful, create a state object or copy it from the paired link. + if stateful, ok := toxic.Toxic.(toxics.StatefulToxic); ok { + if toxic.PairedToxic == nil || link.pairedLink.stubs[toxic.PairedToxic.Index].State == nil { + link.stubs[toxic.Index].State = stateful.NewState() + } else { + link.stubs[toxic.Index].State = link.pairedLink.stubs[toxic.PairedToxic.Index].State + } + } + + // If the toxic is paired, synchronize the toxicity so they are always in the same state. + if toxic.PairedToxic != nil { + link.stubs[toxic.Index].Toxicity = link.pairedLink.stubs[toxic.PairedToxic.Index].Toxicity + } +} + // Add a toxic to the end of the chain. func (link *ToxicLink) AddToxic(toxic *toxics.ToxicWrapper) { i := len(link.stubs) @@ -104,14 +113,7 @@ func (link *ToxicLink) AddToxic(toxic *toxics.ToxicWrapper) { if link.stubs[i-1].InterruptToxic() { link.stubs[i-1].Output = newin - if stateful, ok := toxic.Toxic.(toxics.StatefulToxic); ok { - if toxic.PairedToxic == nil || link.pairedLink.stubs[toxic.PairedToxic.Index].State == nil { - link.stubs[i].State = stateful.NewState() - } else { - link.stubs[i].State = link.pairedLink.stubs[toxic.PairedToxic.Index].State - link.stubs[i].Toxicity = link.pairedLink.stubs[toxic.PairedToxic.Index].Toxicity - } - } + link.InitPairState(toxic) go link.stubs[i].Run(toxic) go link.stubs[i-1].Run(link.toxics.chain[link.direction][i-1]) diff --git a/toxics/bidirectional_test.go b/toxics/bidirectional_test.go index 37105874..2c98806c 100644 --- a/toxics/bidirectional_test.go +++ b/toxics/bidirectional_test.go @@ -23,7 +23,7 @@ type EchoToxicState struct { UpstreamToxicity float32 } -func (t *EchoToxic) PipeRequest(stub *toxics.ToxicStub) { +func (t *EchoToxic) PipeUpstream(stub *toxics.ToxicStub) { state := stub.State.(*EchoToxicState) state.UpstreamToxicity = stub.Toxicity diff --git a/toxics/toxic.go b/toxics/toxic.go index c6102a20..05c2f27e 100644 --- a/toxics/toxic.go +++ b/toxics/toxic.go @@ -46,10 +46,10 @@ type StatefulToxic interface { } // Bidirectional toxics operate on both TCP streams and allow state to be shared. -// PipeRequest() will oparate on the upstream, while Pipe() will operate on the downstream. +// PipeUpstream() will oparate on the upstream, while Pipe() will operate on the downstream. type BidirectionalToxic interface { // Defines the packet flow through an upstream ToxicStub. Operates the same as Pipe(). - PipeRequest(*ToxicStub) + PipeUpstream(*ToxicStub) } type ToxicWrapper struct { @@ -95,7 +95,7 @@ func (s *ToxicStub) Run(toxic *ToxicWrapper) { toxic.Pipe(s) } else { bidirectional := toxic.Toxic.(BidirectionalToxic) - bidirectional.PipeRequest(s) + bidirectional.PipeUpstream(s) } } else { new(NoopToxic).Pipe(s)