@@ -40,6 +40,12 @@ type Runner struct {
4040 chainCfg inttypes.ChainConfig
4141}
4242
43+ // PreparedTx holds a transaction and its message type
44+ type PreparedTx struct {
45+ TxBytes []byte
46+ MsgType loadtesttypes.MsgType
47+ }
48+
4349// NewRunner creates a new load test runner for a given spec
4450func NewRunner (ctx context.Context , spec loadtesttypes.LoadTestSpec ) (* Runner , error ) {
4551 logger , _ := zap .NewDevelopment ()
@@ -175,7 +181,7 @@ func (r *Runner) calculateMsgGasEstimations(ctx context.Context, client *client.
175181}
176182
177183// buildLoad creates transactions for a single message specification
178- func (r * Runner ) buildLoad (ctx context.Context , msgSpec loadtesttypes.LoadTestMsg ) ([][] byte , error ) {
184+ func (r * Runner ) buildLoad (ctx context.Context , msgSpec loadtesttypes.LoadTestMsg ) ([]PreparedTx , error ) {
179185 fromWallet := r .wallets [rand .Intn (len (r .wallets ))]
180186 client := fromWallet .GetClient ()
181187 walletAddress := fromWallet .FormattedAddress ()
@@ -209,17 +215,17 @@ func (r *Runner) buildLoad(ctx context.Context, msgSpec loadtesttypes.LoadTestMs
209215 return nil , fmt .Errorf ("failed to encode tx: %w" , err )
210216 }
211217
212- return [][] byte { txBytes }, nil
218+ return []PreparedTx {{ TxBytes : txBytes , MsgType : msgSpec . Type } }, nil
213219}
214220
215221// buildFullLoad builds the full transaction load for interval-based sending
216- func (r * Runner ) buildFullLoad (ctx context.Context ) ([][][] byte , error ) {
222+ func (r * Runner ) buildFullLoad (ctx context.Context ) ([][]PreparedTx , error ) {
217223 r .logger .Info ("Building load..." , zap .Int ("num_batches" , r .spec .NumBatches ))
218- batchLoads := make ([][][] byte , 0 , r .spec .NumBatches )
224+ batchLoads := make ([][]PreparedTx , 0 , r .spec .NumBatches )
219225 total := 0
220226
221227 for i := range r .spec .NumBatches {
222- batch := make ([][] byte , 0 )
228+ batch := make ([]PreparedTx , 0 )
223229 for _ , msgSpec := range r .spec .Msgs {
224230 select {
225231 case <- ctx .Done ():
@@ -261,8 +267,6 @@ func (r *Runner) Run(ctx context.Context) (loadtesttypes.LoadTestResult, error)
261267 crank := time .NewTicker (r .spec .SendInterval )
262268 defer crank .Stop ()
263269
264- // sleeping once before we start
265- time .Sleep (2 * time .Second )
266270 startTime := time .Now ()
267271
268272 // load index is the index into the batchLoads slice
@@ -303,14 +307,14 @@ loop:
303307 r .logger .Info ("Sending txs" , zap .Int ("num_txs" , len (load )))
304308
305309 // send each tx in a go routine
306- for i , txBytes := range load {
310+ for i , preparedTx := range load {
307311 wg .Add (1 )
308- go func (txBytes [] byte , index int ) {
312+ go func (preparedTx PreparedTx , index int ) {
309313 defer wg .Done ()
310- sentTx := inttypes.SentTx {MsgType : "unknown" } // TODO: track message types properly
314+ sentTx := inttypes.SentTx {MsgType : preparedTx . MsgType }
311315 // select random client for sending
312316 client := r .clients [rand .Intn (len (r .clients ))]
313- res , err := client .BroadcastTx (ctx , txBytes )
317+ res , err := client .BroadcastTx (ctx , preparedTx . TxBytes )
314318 if err != nil {
315319 r .logger .Error ("failed to send tx" , zap .Error (err ), zap .Int ("index" , index ), zap .Int ("load_index" , loadIndex ))
316320 sentTx .Err = err
@@ -320,7 +324,7 @@ loop:
320324 sentTx .NodeAddress = client .GetNodeAddress ().RPC
321325 }
322326 collectionChannel <- sentTx
323- }(txBytes , i )
327+ }(preparedTx , i )
324328 }
325329
326330 loadIndex ++
@@ -341,8 +345,10 @@ loop:
341345 r .logger .Info ("go routines have completed" , zap .Int ("total_txs" , len (sentTxs )))
342346 r .sentTxs = sentTxs
343347
344- r .logger .Info ("Loadtest complete. Waiting for final txs to complete" )
345- time .Sleep (10 * time .Second )
348+ r .logger .Info ("Loadtest complete. Waiting for mempool to clear" )
349+ r .waitForEmptyMempool (ctx , 1 * time .Minute )
350+ // sleep here for a sec because, even though the mempool may be empty, we could still be in process of executing those txs
351+ time .Sleep (5 * time .Second )
346352
347353 r .logger .Info ("Collecting metrics" , zap .Int ("num_txs" , len (r .sentTxs )))
348354 collectorStartTime := time .Now ()
@@ -420,3 +426,41 @@ func RandomString(n int) string {
420426 }
421427 return string (b )
422428}
429+
430+ func (r * Runner ) waitForEmptyMempool (ctx context.Context , timeout time.Duration ) {
431+ wg := sync.WaitGroup {}
432+ for _ , c := range r .clients {
433+ wg .Add (1 )
434+ go func (client * client.Chain ) {
435+ defer wg .Done ()
436+ cometClient := client .GetCometClient ()
437+
438+ started := time .Now ()
439+ timer := time .NewTicker (500 * time .Millisecond )
440+ timout := time .NewTimer (timeout )
441+ defer timer .Stop ()
442+ defer timout .Stop ()
443+
444+ for {
445+ select {
446+ case <- ctx .Done ():
447+ return
448+ case <- timer .C :
449+ res , err := cometClient .NumUnconfirmedTxs (ctx )
450+ if err == nil {
451+ if res .Count == 0 {
452+ r .logger .Debug ("mempool clear. done waiting for mempool" , zap .Duration ("waited" , time .Since (started )))
453+ return
454+ }
455+ } else {
456+ r .logger .Debug ("error calling mempool status" , zap .Error (err ))
457+ }
458+ case <- timout .C :
459+ r .logger .Debug ("timed out waiting for mempool to clear" , zap .Duration ("waited" , timeout ))
460+ return
461+ }
462+ }
463+ }(c )
464+ }
465+ wg .Wait ()
466+ }
0 commit comments