From 1ffa8feb5ec0a4ce08e65c0cd622038627986c7a Mon Sep 17 00:00:00 2001 From: ylsGit Date: Tue, 2 Apr 2024 11:32:11 +0800 Subject: [PATCH] use gas_price index/order/limit when load pending tx from pool --- config/default.go | 1 + db/migrations/pool/0014.sql | 5 +++++ docs/config-file/node-config-doc.html | 2 +- docs/config-file/node-config-doc.md | 15 +++++++++++++++ docs/config-file/node-config-schema.json | 5 +++++ pool/interfaces.go | 2 +- pool/pgpoolstorage/pgpoolstorage.go | 16 ++++++++++++---- pool/pool.go | 4 ++-- sequencer/config.go | 3 +++ sequencer/interfaces.go | 2 +- sequencer/mock_pool.go | 2 +- sequencer/sequencer.go | 2 +- 12 files changed, 48 insertions(+), 11 deletions(-) create mode 100644 db/migrations/pool/0014.sql diff --git a/config/default.go b/config/default.go index 53167c1f77..a01462da03 100644 --- a/config/default.go +++ b/config/default.go @@ -151,6 +151,7 @@ StateConsistencyCheckInterval = "5s" Filename = "" Version = 0 Enabled = false +LoadPendingTxsLimit = 0 [SequenceSender] WaitPeriodSendSequence = "5s" diff --git a/db/migrations/pool/0014.sql b/db/migrations/pool/0014.sql new file mode 100644 index 0000000000..4b2bdb0de2 --- /dev/null +++ b/db/migrations/pool/0014.sql @@ -0,0 +1,5 @@ +-- +migrate Up +CREATE INDEX IF NOT EXISTS idx_transaction_gas_price ON pool.transaction (gas_price); + +-- +migrate Down +DROP INDEX IF EXISTS pool.idx_transaction_gas_price; \ No newline at end of file diff --git a/docs/config-file/node-config-doc.html b/docs/config-file/node-config-doc.html index e9e7aa5821..4f17cb51fb 100644 --- a/docs/config-file/node-config-doc.html +++ b/docs/config-file/node-config-doc.html @@ -52,7 +52,7 @@
"300ms"
 

Default: 0Type: integer

HaltOnBatchNumber specifies the batch number where the Sequencer will stop to process more transactions and generate new batches.
The Sequencer will halt after it closes the batch equal to this number


Default: falseType: boolean

SequentialBatchSanityCheck indicates if the reprocess of a closed batch (sanity check) must be done in a
sequential way (instead than in parallel)


Default: trueType: boolean

SequentialProcessL2Block indicates if the processing of a L2 Block must be done in the same finalizer go func instead
in the processPendingL2Blocks go func


Metrics is the config for the sequencer metrics
Default: "1h0m0s"Type: string

Interval is the interval of time to calculate sequencer metrics


Examples:

"1m"
 
"300ms"
-

Default: trueType: boolean

EnableLog is a flag to enable/disable metrics logs


StreamServerCfg is the config for the stream server
Default: 0Type: integer

Port to listen on


Default: ""Type: string

Filename of the binary data file


Default: 0Type: integer

Version of the binary data file


Default: 0Type: integer

ChainID is the chain ID


Default: falseType: boolean

Enabled is a flag to enable/disable the data streamer


Log is the log configuration
Default: ""Type: enum (of string)

Must be one of:

  • "production"
  • "development"

Default: ""Type: enum (of string)

Must be one of:

  • "debug"
  • "info"
  • "warn"
  • "error"
  • "dpanic"
  • "panic"
  • "fatal"

Type: array of string

Each item of this array must be:


Default: 0Type: integer

UpgradeEtrogBatchNumber is the batch number of the upgrade etrog


Configuration of the sequence sender service
Default: "5s"Type: string

WaitPeriodSendSequence is the time the sequencer waits until
trying to send a sequence to L1


Examples:

"1m"
+

Default: trueType: boolean

EnableLog is a flag to enable/disable metrics logs


StreamServerCfg is the config for the stream server
Default: 0Type: integer

Port to listen on


Default: ""Type: string

Filename of the binary data file


Default: 0Type: integer

Version of the binary data file


Default: 0Type: integer

ChainID is the chain ID


Default: falseType: boolean

Enabled is a flag to enable/disable the data streamer


Log is the log configuration
Default: ""Type: enum (of string)

Must be one of:

  • "production"
  • "development"

Default: ""Type: enum (of string)

Must be one of:

  • "debug"
  • "info"
  • "warn"
  • "error"
  • "dpanic"
  • "panic"
  • "fatal"

Type: array of string

Each item of this array must be:


Default: 0Type: integer

UpgradeEtrogBatchNumber is the batch number of the upgrade etrog


Default: 0Type: integer

LoadPendingTxsLimit is used to limit amount txs from the db


Configuration of the sequence sender service
Default: "5s"Type: string

WaitPeriodSendSequence is the time the sequencer waits until
trying to send a sequence to L1


Examples:

"1m"
 
"300ms"
 

Default: "5s"Type: string

LastBatchVirtualizationTimeMaxWaitPeriod is time since sequences should be sent


Examples:

"1m"
 
"300ms"
diff --git a/docs/config-file/node-config-doc.md b/docs/config-file/node-config-doc.md
index a78084e23a..ec12036b2c 100644
--- a/docs/config-file/node-config-doc.md
+++ b/docs/config-file/node-config-doc.md
@@ -1778,6 +1778,7 @@ CheckLastL2BlockHashOnCloseBatch=true
 | - [StateConsistencyCheckInterval](#Sequencer_StateConsistencyCheckInterval )         | No      | string  | No         | -          | Duration                                                                                         |
 | - [Finalizer](#Sequencer_Finalizer )                                                 | No      | object  | No         | -          | Finalizer's specific config properties                                                           |
 | - [StreamServer](#Sequencer_StreamServer )                                           | No      | object  | No         | -          | StreamServerCfg is the config for the stream server                                              |
+| - [LoadPendingTxsLimit](#Sequencer_LoadPendingTxsLimit )                             | No      | integer | No         | -          | LoadPendingTxsLimit is used to limit amount txs from the db                                      |
 
 ### 10.1. `Sequencer.DeletePoolTxsL1BlockConfirmations`
 
@@ -2388,6 +2389,20 @@ Must be one of:
 UpgradeEtrogBatchNumber=0
 ```
 
+### 10.9. `Sequencer.LoadPendingTxsLimit`
+
+**Type:** : `integer`
+
+**Default:** `0`
+
+**Description:** LoadPendingTxsLimit is used to limit amount txs from the db
+
+**Example setting the default value** (0):
+```
+[Sequencer]
+LoadPendingTxsLimit=0
+```
+
 ## 11. `[SequenceSender]`
 
 **Type:** : `object`
diff --git a/docs/config-file/node-config-schema.json b/docs/config-file/node-config-schema.json
index b9ac390050..a3f73e2a2a 100644
--- a/docs/config-file/node-config-schema.json
+++ b/docs/config-file/node-config-schema.json
@@ -908,6 +908,11 @@
 					"additionalProperties": false,
 					"type": "object",
 					"description": "StreamServerCfg is the config for the stream server"
+				},
+				"LoadPendingTxsLimit": {
+					"type": "integer",
+					"description": "LoadPendingTxsLimit is used to limit amount txs from the db",
+					"default": 0
 				}
 			},
 			"additionalProperties": false,
diff --git a/pool/interfaces.go b/pool/interfaces.go
index 4a2e9bd992..20db0171c4 100644
--- a/pool/interfaces.go
+++ b/pool/interfaces.go
@@ -21,7 +21,7 @@ type storage interface {
 	GetPendingTxHashesSince(ctx context.Context, since time.Time) ([]common.Hash, error)
 	GetTxsByFromAndNonce(ctx context.Context, from common.Address, nonce uint64) ([]Transaction, error)
 	GetTxsByStatus(ctx context.Context, state TxStatus, limit uint64) ([]Transaction, error)
-	GetNonWIPPendingTxs(ctx context.Context) ([]Transaction, error)
+	GetNonWIPPendingTxs(ctx context.Context, limit uint64) ([]Transaction, error)
 	IsTxPending(ctx context.Context, hash common.Hash) (bool, error)
 	SetGasPrices(ctx context.Context, l2GasPrice uint64, l1GasPrice uint64) error
 	DeleteGasPricesHistoryOlderThan(ctx context.Context, date time.Time) error
diff --git a/pool/pgpoolstorage/pgpoolstorage.go b/pool/pgpoolstorage/pgpoolstorage.go
index fbc0aaea62..baab0ce14d 100644
--- a/pool/pgpoolstorage/pgpoolstorage.go
+++ b/pool/pgpoolstorage/pgpoolstorage.go
@@ -172,16 +172,24 @@ func (p *PostgresPoolStorage) GetTxsByStatus(ctx context.Context, status pool.Tx
 }
 
 // GetNonWIPPendingTxs returns an array of transactions
-func (p *PostgresPoolStorage) GetNonWIPPendingTxs(ctx context.Context) ([]pool.Transaction, error) {
+// limit parameter is used to limit amount txs from the db,
+// if limit = 0, then there is no limit
+func (p *PostgresPoolStorage) GetNonWIPPendingTxs(ctx context.Context, limit uint64) ([]pool.Transaction, error) {
 	var (
 		rows pgx.Rows
 		err  error
 		sql  string
 	)
 
-	sql = `SELECT encoded, status, received_at, is_wip, ip, cumulative_gas_used, used_keccak_hashes, used_poseidon_hashes, used_poseidon_paddings, used_mem_aligns,
-		used_arithmetics, used_binaries, used_steps, used_sha256_hashes, failed_reason, reserved_zkcounters FROM pool.transaction WHERE is_wip IS FALSE and status = $1`
-	rows, err = p.db.Query(ctx, sql, pool.TxStatusPending)
+	if limit == 0 {
+		sql = `SELECT encoded, status, received_at, is_wip, ip, cumulative_gas_used, used_keccak_hashes, used_poseidon_hashes, used_poseidon_paddings, used_mem_aligns,
+		used_arithmetics, used_binaries, used_steps, used_sha256_hashes, failed_reason, reserved_zkcounters FROM pool.transaction WHERE is_wip IS FALSE and status = $1 ORDER BY gas_price DESC`
+		rows, err = p.db.Query(ctx, sql, pool.TxStatusPending)
+	} else {
+		sql = `SELECT encoded, status, received_at, is_wip, ip, cumulative_gas_used, used_keccak_hashes, used_poseidon_hashes, used_poseidon_paddings, used_mem_aligns,
+		used_arithmetics, used_binaries, used_steps, used_sha256_hashes, failed_reason, reserved_zkcounters FROM pool.transaction WHERE is_wip IS FALSE and status = $1 ORDER BY gas_price DESC LIMIT $2`
+		rows, err = p.db.Query(ctx, sql, pool.TxStatusPending, limit)
+	}
 
 	if err != nil {
 		return nil, err
diff --git a/pool/pool.go b/pool/pool.go
index 714d7fca05..d8119e755a 100644
--- a/pool/pool.go
+++ b/pool/pool.go
@@ -361,8 +361,8 @@ func (p *Pool) GetPendingTxs(ctx context.Context, limit uint64) ([]Transaction,
 }
 
 // GetNonWIPPendingTxs from the pool
-func (p *Pool) GetNonWIPPendingTxs(ctx context.Context) ([]Transaction, error) {
-	return p.storage.GetNonWIPPendingTxs(ctx)
+func (p *Pool) GetNonWIPPendingTxs(ctx context.Context, limit uint64) ([]Transaction, error) {
+	return p.storage.GetNonWIPPendingTxs(ctx, limit)
 }
 
 // GetSelectedTxs gets selected txs from the pool db
diff --git a/sequencer/config.go b/sequencer/config.go
index 45210c4840..850369a29e 100644
--- a/sequencer/config.go
+++ b/sequencer/config.go
@@ -30,6 +30,9 @@ type Config struct {
 
 	// StreamServerCfg is the config for the stream server
 	StreamServer StreamServerCfg `mapstructure:"StreamServer"`
+
+	// LoadPendingTxsLimit is used to limit amount txs from the db
+	LoadPendingTxsLimit uint64 `mapstructure:"LoadPendingTxsLimit"`
 }
 
 // StreamServerCfg contains the data streamer's configuration properties
diff --git a/sequencer/interfaces.go b/sequencer/interfaces.go
index afe49bceb9..65a6a8ed46 100644
--- a/sequencer/interfaces.go
+++ b/sequencer/interfaces.go
@@ -20,7 +20,7 @@ type txPool interface {
 	DeleteFailedTransactionsOlderThan(ctx context.Context, date time.Time) error
 	DeleteTransactionByHash(ctx context.Context, hash common.Hash) error
 	MarkWIPTxsAsPending(ctx context.Context) error
-	GetNonWIPPendingTxs(ctx context.Context) ([]pool.Transaction, error)
+	GetNonWIPPendingTxs(ctx context.Context, limit uint64) ([]pool.Transaction, error)
 	UpdateTxStatus(ctx context.Context, hash common.Hash, newStatus pool.TxStatus, isWIP bool, failedReason *string) error
 	GetTxZkCountersByHash(ctx context.Context, hash common.Hash) (*state.ZKCounters, *state.ZKCounters, error)
 	UpdateTxWIPStatus(ctx context.Context, hash common.Hash, isWIP bool) error
diff --git a/sequencer/mock_pool.go b/sequencer/mock_pool.go
index 00bc480699..22d9b44056 100644
--- a/sequencer/mock_pool.go
+++ b/sequencer/mock_pool.go
@@ -180,7 +180,7 @@ func (_m *PoolMock) GetL1AndL2GasPrice() (uint64, uint64) {
 }
 
 // GetNonWIPPendingTxs provides a mock function with given fields: ctx
-func (_m *PoolMock) GetNonWIPPendingTxs(ctx context.Context) ([]pool.Transaction, error) {
+func (_m *PoolMock) GetNonWIPPendingTxs(ctx context.Context, limit uint64) ([]pool.Transaction, error) {
 	ret := _m.Called(ctx)
 
 	if len(ret) == 0 {
diff --git a/sequencer/sequencer.go b/sequencer/sequencer.go
index e98c367fc1..4c7e97eff4 100644
--- a/sequencer/sequencer.go
+++ b/sequencer/sequencer.go
@@ -195,7 +195,7 @@ func (s *Sequencer) expireOldWorkerTxs(ctx context.Context) {
 // loadFromPool keeps loading transactions from the pool
 func (s *Sequencer) loadFromPool(ctx context.Context) {
 	for {
-		poolTransactions, err := s.pool.GetNonWIPPendingTxs(ctx)
+		poolTransactions, err := s.pool.GetNonWIPPendingTxs(ctx, s.cfg.LoadPendingTxsLimit)
 		if err != nil && err != pool.ErrNotFound {
 			log.Errorf("error loading txs from pool, error: %v", err)
 		}