@@ -21,10 +21,11 @@ import (
21
21
type ToxicCollection struct {
22
22
sync.Mutex
23
23
24
- noop * toxics.ToxicWrapper
25
- proxy * Proxy
26
- chain [][]* toxics.ToxicWrapper
27
- links map [string ]* ToxicLink
24
+ noop * toxics.ToxicWrapper
25
+ proxy * Proxy
26
+ chain [][]* toxics.ToxicWrapper
27
+ toxicConditions [][]* toxics.ToxicCondition
28
+ links map [string ]* ToxicLink
28
29
}
29
30
30
31
func NewToxicCollection (proxy * Proxy ) * ToxicCollection {
@@ -34,13 +35,16 @@ func NewToxicCollection(proxy *Proxy) *ToxicCollection {
34
35
Type : "noop" ,
35
36
Enabled : true ,
36
37
},
37
- proxy : proxy ,
38
- chain : make ([][]* toxics.ToxicWrapper , stream .NumDirections ),
39
- links : make (map [string ]* ToxicLink ),
38
+ proxy : proxy ,
39
+ chain : make ([][]* toxics.ToxicWrapper , stream .NumDirections ),
40
+ toxicConditions : make ([][]* toxics.ToxicCondition , stream .NumDirections ),
41
+ links : make (map [string ]* ToxicLink ),
40
42
}
41
43
for dir := range collection .chain {
42
44
collection .chain [dir ] = make ([]* toxics.ToxicWrapper , 1 , toxics .Count ()+ 1 )
43
45
collection .chain [dir ][0 ] = collection .noop
46
+ collection .toxicConditions [dir ] = make ([]* toxics.ToxicCondition , 1 , toxics .Count ()+ 1 )
47
+ collection .toxicConditions [dir ][0 ] = nil
44
48
}
45
49
return collection
46
50
}
@@ -116,6 +120,8 @@ func (c *ToxicCollection) AddToxicJson(data io.Reader) (*toxics.ToxicWrapper, er
116
120
// Set the wrapper to be enabled if no condition is specified.
117
121
if wrapper .Condition == nil {
118
122
wrapper .Enabled = true
123
+ } else {
124
+ wrapper .Condition .ToxicWrapper = wrapper
119
125
}
120
126
121
127
// Check if toxic already exists
@@ -208,9 +214,70 @@ func (c *ToxicCollection) StartLink(
208
214
logger = zerolog .Nop ()
209
215
}
210
216
211
- link := NewToxicLink (c .proxy , c , direction , logger )
212
- link .Start (server , name , input , output )
213
- c .links [name ] = link
217
+ // If the direction is upstream, we need to run matchers and update
218
+ // toxics if matched.
219
+ if direction == stream .Upstream {
220
+ // Write input to the matcher writer so that we can match the input
221
+ // in parallel while piping it through the link.
222
+ streamChan := make (chan * stream.StreamChunk )
223
+ streamChanWriter := stream .NewChanWriter (streamChan )
224
+ forkedInput := io .TeeReader (input , streamChanWriter )
225
+
226
+ // Fire of a goroutine to match all conditions separately.
227
+ go c .matchAllToxicConditions (streamChan , direction )
228
+
229
+ link := NewToxicLink (c .proxy , c , direction , logger )
230
+ link .Start (server , name , forkedInput , output )
231
+ c .links [name ] = link
232
+ } else {
233
+ link := NewToxicLink (c .proxy , c , direction , logger )
234
+ link .Start (server , name , input , output )
235
+ c .links [name ] = link
236
+ }
237
+ }
238
+
239
+ // matchAllToxicConditions matches all conditions for a given direction, and updates
240
+ // the toxics if matched.
241
+ func (c * ToxicCollection ) matchAllToxicConditions (
242
+ streamChan chan * stream.StreamChunk ,
243
+ direction stream.Direction ,
244
+ ) {
245
+ c .Lock ()
246
+ defer c .Unlock ()
247
+
248
+ var logger zerolog.Logger
249
+ if c .proxy .Logger != nil {
250
+ logger = * c .proxy .Logger
251
+ } else {
252
+ logger = zerolog .Nop ()
253
+ }
254
+
255
+ streamChunk := <- streamChan
256
+
257
+ // Loop through all conditions and try to match them.
258
+ // If matched, enable the toxic.
259
+ for _ , condition := range c .toxicConditions [direction ] {
260
+ if condition == nil {
261
+ continue
262
+ }
263
+
264
+ matched , err := condition .TryMatch (streamChunk .Data )
265
+ if err != nil {
266
+ logger .Warn ().Err (err ).Msg ("Error matching condition" )
267
+ continue
268
+ }
269
+
270
+ if matched {
271
+ // Get the toxic wrapper from the condition and enable it.
272
+ newToxicWrapper := condition .ToxicWrapper
273
+ newToxicWrapper .Enabled = true
274
+
275
+ // TODO: Do I need to call this? Currently fails when uncommented, though.
276
+ // c.chainUpdateToxic(newToxicWrapper)
277
+ }
278
+ }
279
+
280
+ return
214
281
}
215
282
216
283
func (c * ToxicCollection ) RemoveLink (name string ) {
@@ -236,6 +303,7 @@ func (c *ToxicCollection) chainAddToxic(toxic *toxics.ToxicWrapper) {
236
303
dir := toxic .Direction
237
304
toxic .Index = len (c .chain [dir ])
238
305
c .chain [dir ] = append (c .chain [dir ], toxic )
306
+ c .toxicConditions [dir ] = append (c .toxicConditions [dir ], toxic .Condition )
239
307
240
308
// Asynchronously add the toxic to each link
241
309
wg := sync.WaitGroup {}
@@ -253,6 +321,7 @@ func (c *ToxicCollection) chainAddToxic(toxic *toxics.ToxicWrapper) {
253
321
254
322
func (c * ToxicCollection ) chainUpdateToxic (toxic * toxics.ToxicWrapper ) {
255
323
c.chain [toxic.Direction ][toxic.Index ] = toxic
324
+ c.toxicConditions [toxic.Direction ][toxic.Index ] = toxic .Condition
256
325
257
326
// Asynchronously update the toxic in each link
258
327
group := sync.WaitGroup {}
@@ -279,6 +348,7 @@ func (c *ToxicCollection) chainRemoveToxic(ctx context.Context, toxic *toxics.To
279
348
280
349
dir := toxic .Direction
281
350
c .chain [dir ] = append (c .chain [dir ][:toxic .Index ], c .chain [dir ][toxic .Index + 1 :]... )
351
+ c .toxicConditions [dir ] = append (c .toxicConditions [dir ][:toxic .Index ], c .toxicConditions [dir ][toxic .Index + 1 :]... )
282
352
for i := toxic .Index ; i < len (c .chain [dir ]); i ++ {
283
353
c.chain [dir ][i ].Index = i
284
354
}
0 commit comments