diff --git a/CHANGELOG.md b/CHANGELOG.md index 24125e08..0403fdb9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# 2.2.0 (unreleased) + +* Add support for bidirectional toxics (for protocol-aware toxics) #132 + # 2.1.3 * Update `/version` endpoint to also return a charset of utf-8. #204 diff --git a/CREATING_TOXICS.md b/CREATING_TOXICS.md index ab39bc83..c5b0d355 100644 --- a/CREATING_TOXICS.md +++ b/CREATING_TOXICS.md @@ -118,9 +118,18 @@ and how much memory you are comfortable with using. ## Stateful toxics If a toxic needs to store extra information for a connection such as the number of bytes -transferred (See `limit_data` toxic), a state object can be created by implementing the -`StatefulToxic` interface. This interface defines the `NewState()` function that can create -a new state object with default values set. +transferred (See the [limit_data toxic](https://github.com/Shopify/toxiproxy/blob/master/toxics/limit_data.go)), +a state object can be created by implementing the `StatefulToxic` interface. This interface +defines the `NewState()` function that can create a new state object with default values set. + +```go +func (t *ExampleToxic) NewState() interface{} { + return &ExampleToxicState{ + BytesRemaining: t.BytesAllowed, + SomeOtherState: true, + } +} +``` When a stateful toxic is created, the state object will be stored on the `ToxicStub` and can be accessed from `toxic.Pipe()`: @@ -133,6 +142,78 @@ If necessary, some global state can be stored in the toxic struct, which will no instanced per-connection. These fields cannot have a custom default value set and will not be thread-safe, so proper locking or atomic operations will need to be used. +## Bidirectional toxics + +Regular toxics are limited to data flowing in a single direction, so they can't make decisions +for the `downstream` based on a request in the `upstream`. For things like protocol aware toxics +this is a problem. + +Bidirectional toxics allow state to be shared for the `upstream` and `downstream` pipes in a single +toxic implementation. They also ensure direction-specific code is always run on the correct pipe +(a toxic that only works on the `upstream` can't be added to the `downstream`). + +Creating a bidirectional toxic is done by implementing a second `Pipe()` function called `PipeUpstream()`. +The implementation is same as a regular toxic, and can be paired with other types such as a stateful toxic. + +One use case of a bidirectional toxic is to mock out the backend server entirely, which is shown below: + +```go +type EchoToxic struct {} + +type EchoToxicState struct { + Request chan *stream.StreamChunk +} + +// PipeUpstream handles the upstream direction +func (t *EchoToxic) PipeUpstream(stub *toxics.ToxicStub) { + state := stub.State.(*EchoToxicState) + + for { + select { + case <-stub.Interrupt: + return + case c := <-stub.Input: + if c == nil { + // Close the downstream when the client closes + close(state.Request) + stub.Close() + return + } + // Send the data to the downstream through the state object + state.Request <- c + } + } +} + +// Pipe() will only handle the downstream on a bidirectional toxic +func (t *EchoToxic) Pipe(stub *toxics.ToxicStub) { + state := stub.State.(*EchoToxicState) + + for { + select { + case <-stub.Interrupt: + return + case c := <-state.Request: // Read from the upstream instead of the server + if c == nil { + stub.Close() + return + } + stub.Output <- c + } + } +} + +func (t *EchoToxic) NewState() interface{} { + return &EchoToxicState{ + Request: make(chan *stream.StreamChunk), + } +} +``` + +This example will loop back all data send to the server back to the client. Another use case seen +within toxiproxy is to filter http response modifications based on the request URL (See the +[http toxic](https://github.com/Shopify/toxiproxy/tree/master/toxics/http.go)). + ## Using `io.Reader` and `io.Writer` If your toxic involves modifying the data going through a proxy, you can use the `ChanReader` diff --git a/api.go b/api.go index 02edd9aa..c4aa400f 100644 --- a/api.go +++ b/api.go @@ -443,7 +443,7 @@ func apiError(resp http.ResponseWriter, err error) bool { type proxyToxics struct { *Proxy - Toxics []toxics.Toxic `json:"toxics"` + Toxics []*toxics.ToxicWrapper `json:"toxics"` } func proxyWithToxics(proxy *Proxy) (result proxyToxics) { diff --git a/link.go b/link.go index 7a67ae16..f110f0fb 100644 --- a/link.go +++ b/link.go @@ -18,12 +18,13 @@ import ( // Input > ToxicStub > ToxicStub > Output // type ToxicLink struct { - stubs []*toxics.ToxicStub - proxy *Proxy - toxics *ToxicCollection - input *stream.ChanWriter - output *stream.ChanReader - direction stream.Direction + stubs []*toxics.ToxicStub + proxy *Proxy + toxics *ToxicCollection + input *stream.ChanWriter + output *stream.ChanReader + direction stream.Direction + pairedLink *ToxicLink } func NewToxicLink(proxy *Proxy, collection *ToxicCollection, direction stream.Direction) *ToxicLink { @@ -66,9 +67,7 @@ func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser) link.input.Close() }() for i, toxic := range link.toxics.chain[link.direction] { - if stateful, ok := toxic.Toxic.(toxics.StatefulToxic); ok { - link.stubs[i].State = stateful.NewState() - } + link.InitPairState(toxic) go link.stubs[i].Run(toxic) } @@ -87,6 +86,22 @@ func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser) }() } +func (link *ToxicLink) InitPairState(toxic *toxics.ToxicWrapper) { + // If the toxic is stateful, create a state object or copy it from the paired link. + if stateful, ok := toxic.Toxic.(toxics.StatefulToxic); ok { + if toxic.PairedToxic == nil || link.pairedLink.stubs[toxic.PairedToxic.Index].State == nil { + link.stubs[toxic.Index].State = stateful.NewState() + } else { + link.stubs[toxic.Index].State = link.pairedLink.stubs[toxic.PairedToxic.Index].State + } + } + + // If the toxic is paired, synchronize the toxicity so they are always in the same state. + if toxic.PairedToxic != nil { + link.stubs[toxic.Index].Toxicity = link.pairedLink.stubs[toxic.PairedToxic.Index].Toxicity + } +} + // Add a toxic to the end of the chain. func (link *ToxicLink) AddToxic(toxic *toxics.ToxicWrapper) { i := len(link.stubs) @@ -98,9 +113,7 @@ func (link *ToxicLink) AddToxic(toxic *toxics.ToxicWrapper) { if link.stubs[i-1].InterruptToxic() { link.stubs[i-1].Output = newin - if stateful, ok := toxic.Toxic.(toxics.StatefulToxic); ok { - link.stubs[i].State = stateful.NewState() - } + link.InitPairState(toxic) go link.stubs[i].Run(toxic) go link.stubs[i-1].Run(link.toxics.chain[link.direction][i-1]) diff --git a/proxy.go b/proxy.go index 1d165053..f3da6b3a 100644 --- a/proxy.go +++ b/proxy.go @@ -2,13 +2,11 @@ package toxiproxy import ( "errors" + "net" "sync" - "github.com/Shopify/toxiproxy/stream" "github.com/sirupsen/logrus" tomb "gopkg.in/tomb.v1" - - "net" ) // Proxy represents the proxy in its entirity with all its links. The main @@ -176,8 +174,7 @@ func (proxy *Proxy) server() { proxy.connections.list[name+"upstream"] = upstream proxy.connections.list[name+"downstream"] = client proxy.connections.Unlock() - proxy.Toxics.StartLink(name+"upstream", client, upstream, stream.Upstream) - proxy.Toxics.StartLink(name+"downstream", upstream, client, stream.Downstream) + proxy.Toxics.StartLinks(name, client, upstream) } } diff --git a/toxic_collection.go b/toxic_collection.go index 3aded164..486ca70d 100644 --- a/toxic_collection.go +++ b/toxic_collection.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "net" "strings" "sync" @@ -61,18 +62,17 @@ func (c *ToxicCollection) GetToxic(name string) *toxics.ToxicWrapper { return c.findToxicByName(name) } -func (c *ToxicCollection) GetToxicArray() []toxics.Toxic { +func (c *ToxicCollection) GetToxicArray() []*toxics.ToxicWrapper { c.Lock() defer c.Unlock() - result := make([]toxics.Toxic, 0) + result := make([]*toxics.ToxicWrapper, 0) for dir := range c.chain { - for i, toxic := range c.chain[dir] { - if i == 0 { - // Skip the first noop toxic, it should not be visible - continue + for _, toxic := range c.chain[dir] { + if len(toxic.Name) > 0 { + // Skip toxics with no name, they should be hidden + result = append(result, toxic) } - result = append(result, toxic) } } return result @@ -96,20 +96,31 @@ func (c *ToxicCollection) AddToxicJson(data io.Reader) (*toxics.ToxicWrapper, er return nil, joinError(err, ErrBadRequestBody) } - switch strings.ToLower(wrapper.Stream) { - case "downstream": - wrapper.Direction = stream.Downstream - case "upstream": - wrapper.Direction = stream.Upstream - default: - return nil, ErrInvalidStream + if toxics.New(wrapper) == nil { + return nil, ErrInvalidToxicType } - if wrapper.Name == "" { - wrapper.Name = fmt.Sprintf("%s_%s", wrapper.Type, wrapper.Stream) + + if wrapper.PairedToxic == nil { + switch strings.ToLower(wrapper.Stream) { + case "downstream": + wrapper.Direction = stream.Downstream + case "upstream": + wrapper.Direction = stream.Upstream + default: + return nil, ErrInvalidStream + } + } else { + wrapper.Stream = "both" + wrapper.Direction = stream.Downstream + wrapper.PairedToxic.Direction = stream.Upstream } - if toxics.New(wrapper) == nil { - return nil, ErrInvalidToxicType + if wrapper.Name == "" { + if wrapper.PairedToxic != nil { + wrapper.Name = wrapper.Type + } else { + wrapper.Name = fmt.Sprintf("%s_%s", wrapper.Type, wrapper.Stream) + } } found := c.findToxicByName(wrapper.Name) @@ -129,6 +140,9 @@ func (c *ToxicCollection) AddToxicJson(data io.Reader) (*toxics.ToxicWrapper, er } c.chainAddToxic(wrapper) + if wrapper.PairedToxic != nil { + c.chainAddToxic(wrapper.PairedToxic) + } return wrapper, nil } @@ -151,6 +165,10 @@ func (c *ToxicCollection) UpdateToxicJson(name string, data io.Reader) (*toxics. } toxic.Toxicity = attrs.Toxicity + if toxic.PairedToxic != nil { + toxic.PairedToxic.Toxicity = attrs.Toxicity + c.chainUpdateToxic(toxic.PairedToxic) + } c.chainUpdateToxic(toxic) return toxic, nil } @@ -163,19 +181,30 @@ func (c *ToxicCollection) RemoveToxic(name string) error { toxic := c.findToxicByName(name) if toxic != nil { + if toxic.PairedToxic != nil { + c.chainRemoveToxic(toxic.PairedToxic) + } c.chainRemoveToxic(toxic) return nil } return ErrToxicNotFound } -func (c *ToxicCollection) StartLink(name string, input io.Reader, output io.WriteCloser, direction stream.Direction) { +func (c *ToxicCollection) StartLinks(name string, client, upstream net.Conn) { c.Lock() defer c.Unlock() - link := NewToxicLink(c.proxy, c, direction) - link.Start(name, input, output) - c.links[name] = link + linkUp := NewToxicLink(c.proxy, c, stream.Upstream) + linkDown := NewToxicLink(c.proxy, c, stream.Downstream) + + linkUp.pairedLink = linkDown + linkDown.pairedLink = linkUp + + linkUp.Start(name+"upstream", client, upstream) + linkDown.Start(name+"downstream", upstream, client) + + c.links[name+"upstream"] = linkUp + c.links[name+"downstream"] = linkDown } func (c *ToxicCollection) RemoveLink(name string) { @@ -187,12 +216,8 @@ func (c *ToxicCollection) RemoveLink(name string) { // All following functions assume the lock is already grabbed func (c *ToxicCollection) findToxicByName(name string) *toxics.ToxicWrapper { for dir := range c.chain { - for i, toxic := range c.chain[dir] { - if i == 0 { - // Skip the first noop toxic, it has no name - continue - } - if toxic.Name == name { + for _, toxic := range c.chain[dir] { + if len(toxic.Name) > 0 && toxic.Name == name { return toxic } } diff --git a/toxics/bidirectional_test.go b/toxics/bidirectional_test.go new file mode 100644 index 00000000..2c98806c --- /dev/null +++ b/toxics/bidirectional_test.go @@ -0,0 +1,286 @@ +package toxics_test + +import ( + "bufio" + "bytes" + "fmt" + "math/rand" + "net" + "strconv" + "testing" + + "github.com/Shopify/toxiproxy" + "github.com/Shopify/toxiproxy/stream" + "github.com/Shopify/toxiproxy/toxics" +) + +type EchoToxic struct { + Replace bool `json:"replace"` +} + +type EchoToxicState struct { + Request chan *stream.StreamChunk + UpstreamToxicity float32 +} + +func (t *EchoToxic) PipeUpstream(stub *toxics.ToxicStub) { + state := stub.State.(*EchoToxicState) + state.UpstreamToxicity = stub.Toxicity + + for { + select { + case <-stub.Interrupt: + return + case c := <-stub.Input: + if c == nil { + close(state.Request) + stub.Close() + return + } + state.Request <- c + } + } +} + +func (t *EchoToxic) Pipe(stub *toxics.ToxicStub) { + state := stub.State.(*EchoToxicState) + + for { + select { + case <-stub.Interrupt: + return + case c := <-state.Request: + if c == nil { + stub.Close() + return + } + if t.Replace { + c.Data = []byte("foobar\n") + } + if stub.Toxicity != state.UpstreamToxicity { + c.Data = []byte(fmt.Sprintf("Incorrect toxicity: %f != %f\n", stub.Toxicity, state.UpstreamToxicity)) + } + stub.Output <- c + } + } +} + +func (t *EchoToxic) NewState() interface{} { + return &EchoToxicState{ + Request: make(chan *stream.StreamChunk), + } +} + +func init() { + toxics.Register("echo_test", new(EchoToxic)) +} + +func WithEchoToxic(t *testing.T, existingLink bool, f func(proxy net.Conn, response chan []byte, proxyServer *toxiproxy.Proxy)) { + WithEchoServer(t, func(upstream string, response chan []byte) { + proxy := NewTestProxy("test", upstream) + proxy.Start() + + // Test the 2 different code paths for adding toxics to existing links vs adding them on link creation. + if !existingLink { + proxy.Toxics.AddToxicJson(ToxicToJson(t, "", "echo_test", "both", &EchoToxic{})) + } + + conn, err := net.Dial("tcp", proxy.Listen) + if err != nil { + t.Error("Unable to dial TCP server", err) + } + + if existingLink { + proxy.Toxics.AddToxicJson(ToxicToJson(t, "", "echo_test", "both", &EchoToxic{})) + } + + f(conn, response, proxy) + + proxy.Stop() + }) +} + +func AssertToxicEchoResponse(t *testing.T, proxy net.Conn, serverResponse chan []byte, expectServer bool) { + msg := []byte("hello " + strconv.Itoa(rand.Int()) + " world\n") + + _, err := proxy.Write(msg) + if err != nil { + t.Error("Failed writing to TCP server", err) + } + + scan := bufio.NewScanner(proxy) + if !scan.Scan() { + t.Error("Server unexpectedly closed connection") + } + + resp := append(scan.Bytes(), '\n') + if !bytes.Equal(resp, msg) { + t.Error("Client didn't read correct bytes from proxy:", string(resp), "!=", string(msg)) + } + + if expectServer { + resp = <-serverResponse + if !bytes.Equal(resp, msg) { + t.Error("Server didn't read correct bytes from client:", string(resp), "!=", string(msg)) + } + } else { + select { + case resp = <-serverResponse: + t.Error("Server got unexpected data from client:", string(resp)) + default: + } + } +} + +func TestAddUpdateRemoveBidirectionalToxic(t *testing.T) { + for existing := 0; existing < 2; existing++ { + WithEchoToxic(t, existing > 0, func(proxy net.Conn, response chan []byte, proxyServer *toxiproxy.Proxy) { + AssertToxicEchoResponse(t, proxy, response, false) + + proxyServer.Toxics.UpdateToxicJson("echo_test", bytes.NewReader([]byte(`{"attributes": {"replace": true}}`))) + + _, err := proxy.Write([]byte("hello world\n")) + if err != nil { + t.Error("Failed writing to TCP server", err) + } + + scan := bufio.NewScanner(proxy) + if !scan.Scan() { + t.Error("Server unexpectedly closed connection") + } + + resp := scan.Bytes() + if !bytes.Equal(resp, []byte("foobar")) { + t.Error("Client didn't read correct bytes from proxy:", string(resp), "!= foobar") + } + + proxyServer.Toxics.RemoveToxic("echo_test") + + AssertToxicEchoResponse(t, proxy, response, true) + }) + } +} + +func TestBidirectionalToxicOnlyShowsUpOnce(t *testing.T) { + proxy := NewTestProxy("test", "localhost:20001") + proxy.Start() + + toxic, _ := proxy.Toxics.AddToxicJson(ToxicToJson(t, "", "echo_test", "both", &EchoToxic{})) + if toxic.PairedToxic == nil { + t.Fatal("Expected bidirectional toxic to have a paired toxic.") + } else if toxic.PairedToxic.Name != "" || toxic.PairedToxic.Direction != stream.Upstream || toxic.PairedToxic.PairedToxic != toxic { + t.Fatalf("Paired toxic had incorrect values: %+v", toxic.PairedToxic) + } else if toxic.Direction != stream.Downstream { + t.Fatal("Expected toxic to have downstream direction set:", toxic.Direction) + } + + toxics := proxy.Toxics.GetToxicArray() + if len(toxics) != 1 { + t.Fatalf("Wrong number of toxics returned: %d != 1", len(toxics)) + } else if toxics[0].Name != "echo_test" || toxics[0].Stream != "both" { + t.Fatalf("Toxic was not stored as expected: %+v", toxics[0]) + } + + proxy.Stop() +} + +func TestBidirectionalToxicDuplicateName(t *testing.T) { + proxy := NewTestProxy("test", "localhost:20001") + proxy.Start() + + // Test against regular toxics + proxy.Toxics.AddToxicJson(ToxicToJson(t, "foo", "latency", "downstream", &toxics.LatencyToxic{})) + _, err := proxy.Toxics.AddToxicJson(ToxicToJson(t, "foo", "echo_test", "both", &EchoToxic{})) + if err != toxiproxy.ErrToxicAlreadyExists { + t.Fatal("Expected adding toxic to fail due to existing toxic:", err) + } + + // Test against other bidirection toxics + proxy.Toxics.AddToxicJson(ToxicToJson(t, "bar", "echo_test", "both", &EchoToxic{})) + _, err = proxy.Toxics.AddToxicJson(ToxicToJson(t, "bar", "echo_test", "both", &EchoToxic{})) + if err != toxiproxy.ErrToxicAlreadyExists { + t.Fatal("Expected adding toxic to fail due to existing bidirectional toxic:", err) + } + + toxics := proxy.Toxics.GetToxicArray() + if len(toxics) != 2 { + t.Fatalf("Wrong number of toxics returned: %d != 2", len(toxics)) + } else if toxics[0].Name != "foo" || toxics[0].Type != "latency" || toxics[0].Stream != "downstream" { + t.Fatalf("Latency toxic was not stored as expected: %+v", toxics[0]) + } else if toxics[1].Name != "bar" || toxics[1].Type != "echo_test" || toxics[1].Stream != "both" { + t.Fatalf("Bidirectional toxic was not stored as expected: %+v", toxics[1]) + } + + proxy.Stop() +} + +func TestBidirectionalToxicWithStartToxicity(t *testing.T) { + for existing := 0; existing < 2; existing++ { + WithEchoToxic(t, existing > 0, func(proxy net.Conn, response chan []byte, proxyServer *toxiproxy.Proxy) { + proxyServer.Toxics.RemoveToxic("echo_test") + AssertToxicEchoResponse(t, proxy, response, true) + + proxyServer.Toxics.AddToxicJson(bytes.NewReader([]byte(`{"type": "echo_test", "toxicity": 0.5}`))) + + _, err := proxy.Write([]byte("hello world\n")) + if err != nil { + t.Error("Failed writing to TCP server", err) + } + + scan := bufio.NewScanner(proxy) + if !scan.Scan() { + t.Error("Server unexpectedly closed connection") + } + + resp := scan.Bytes() + if !bytes.Equal(resp, []byte("hello world")) { + t.Error("Client didn't read correct bytes from proxy:", string(resp), "!= hello world") + } + + // Clear out response, it's random if the request made it through or not + select { + case <-response: + default: + } + + proxyServer.Toxics.RemoveToxic("echo_test") + + AssertToxicEchoResponse(t, proxy, response, true) + }) + } +} + +func TestBidirectionalToxicWithUpdatedToxicity(t *testing.T) { + for existing := 0; existing < 2; existing++ { + WithEchoToxic(t, existing > 0, func(proxy net.Conn, response chan []byte, proxyServer *toxiproxy.Proxy) { + AssertToxicEchoResponse(t, proxy, response, false) + + proxyServer.Toxics.UpdateToxicJson("echo_test", bytes.NewReader([]byte(`{"toxicity": 0.5}`))) + + _, err := proxy.Write([]byte("hello world\n")) + if err != nil { + t.Error("Failed writing to TCP server", err) + } + + scan := bufio.NewScanner(proxy) + if !scan.Scan() { + t.Error("Server unexpectedly closed connection") + } + + resp := scan.Bytes() + if !bytes.Equal(resp, []byte("hello world")) { + t.Error("Client didn't read correct bytes from proxy:", string(resp), "!= hello world") + } + + // Clear out response, it's random if the request made it through or not + select { + case <-response: + default: + } + + proxyServer.Toxics.RemoveToxic("echo_test") + + AssertToxicEchoResponse(t, proxy, response, true) + }) + } +} diff --git a/toxics/toxic.go b/toxics/toxic.go index bccd5f88..05c2f27e 100644 --- a/toxics/toxic.go +++ b/toxics/toxic.go @@ -31,6 +31,7 @@ type CleanupToxic interface { Cleanup(*ToxicStub) } +// Buffered toxics include a buffer for incoming data so that delays don't block other toxics. type BufferedToxic interface { // Defines the size of buffer this toxic should use GetBufferSize() int @@ -44,21 +45,30 @@ type StatefulToxic interface { NewState() interface{} } +// Bidirectional toxics operate on both TCP streams and allow state to be shared. +// PipeUpstream() will oparate on the upstream, while Pipe() will operate on the downstream. +type BidirectionalToxic interface { + // Defines the packet flow through an upstream ToxicStub. Operates the same as Pipe(). + PipeUpstream(*ToxicStub) +} + type ToxicWrapper struct { - Toxic `json:"attributes"` - Name string `json:"name"` - Type string `json:"type"` - Stream string `json:"stream"` - Toxicity float32 `json:"toxicity"` - Direction stream.Direction `json:"-"` - Index int `json:"-"` - BufferSize int `json:"-"` + Toxic `json:"attributes"` + Name string `json:"name"` + Type string `json:"type"` + Stream string `json:"stream"` + Toxicity float32 `json:"toxicity"` + Direction stream.Direction `json:"-"` + Index int `json:"-"` + BufferSize int `json:"-"` + PairedToxic *ToxicWrapper `json:"-"` } type ToxicStub struct { Input <-chan *stream.StreamChunk Output chan<- *stream.StreamChunk State interface{} + Toxicity float32 Interrupt chan struct{} running chan struct{} closed chan struct{} @@ -70,6 +80,7 @@ func NewToxicStub(input <-chan *stream.StreamChunk, output chan<- *stream.Stream closed: make(chan struct{}), Input: input, Output: output, + Toxicity: rand.Float32(), } } @@ -78,8 +89,14 @@ func NewToxicStub(input <-chan *stream.StreamChunk, output chan<- *stream.Stream func (s *ToxicStub) Run(toxic *ToxicWrapper) { s.running = make(chan struct{}) defer close(s.running) - if rand.Float32() < toxic.Toxicity { - toxic.Pipe(s) + + if s.Toxicity < toxic.Toxicity { + if toxic.PairedToxic == nil || toxic.Direction == stream.Downstream { + toxic.Pipe(s) + } else { + bidirectional := toxic.Toxic.(BidirectionalToxic) + bidirectional.PipeUpstream(s) + } } else { new(NoopToxic).Pipe(s) } @@ -140,6 +157,15 @@ func New(wrapper *ToxicWrapper) Toxic { } else { wrapper.BufferSize = 0 } + if _, ok := wrapper.Toxic.(BidirectionalToxic); ok { + wrapper.PairedToxic = &ToxicWrapper{ + Toxic: wrapper.Toxic, + Type: wrapper.Type, + Toxicity: wrapper.Toxicity, + BufferSize: wrapper.BufferSize, + PairedToxic: wrapper, + } + } return wrapper.Toxic } diff --git a/toxics/toxic_test.go b/toxics/toxic_test.go index 2f5d87d6..d720fe48 100644 --- a/toxics/toxic_test.go +++ b/toxics/toxic_test.go @@ -56,7 +56,7 @@ func WithEchoServer(t *testing.T, f func(string, chan []byte)) { ln.Close() scan := bufio.NewScanner(src) - if scan.Scan() { + for scan.Scan() { received := append(scan.Bytes(), '\n') response <- received