diff --git a/README.md b/README.md index 013ce77..84751fb 100644 --- a/README.md +++ b/README.md @@ -105,6 +105,17 @@ So please make sure you consult the docs of the node provider you are going to u For Chainstack, all of the details and limits you need to be aware of are consolidated here: [Throughput guidelines](https://docs.chainstack.com/docs/limits) <— we are _always_ keeping this piece up to date so you can rely on it. +### Built-in RPC Rate Limiting + +The bot now includes built-in RPC rate limiting to prevent hitting provider limits: + +- **Token bucket algorithm**: Smoothly controls request rate while allowing short bursts +- **Configurable max RPS**: Set `max_rps` parameter in `SolanaClient` (defaults to 25 RPS) +- **Automatic retry logic**: Handles 429 (Too Many Requests) errors with exponential backoff +- **Shared session management**: Reuses connections for improved performance + +This helps ensure reliable operation within your node provider's rate limits without manual throttling. + ## Changelog Quick note on a couple on a few new scripts in `/learning-examples`: @@ -184,11 +195,11 @@ Also here's a doc on this: [Solana: Listening to pump.fun token mint using only As of April 30, 2025, all changes from **refactored/main-v2** are merged into the **main** version. -| Stage | Feature | Comments | Implementation status +| Stage | Feature | Comments | Implementation status | |-------|---------|----------|---------------------| | **Stage 1: General updates & QoL** | Lib updates | Updating to the latest libraries | ✅ | -| | Error handling | Improving error handling | ✅ | -| | Configurable RPS | Ability to set RPS in the config to match your provider's and plan RPS (preferably [Chainstack](https://console.chainstack.com/) 🤩) | Not started | +| | Error handling | Improving error handling | ✅ | +| | Configurable RPS | Ability to set RPS in the config to match your provider's and plan RPS (preferably [Chainstack](https://console.chainstack.com/) 🤩) | ✅ | | | Dynamic priority fees | Ability to set dynamic priority fees | ✅ | | | Review & optimize `json`, `jsonParsed`, `base64` | Improve speed and traffic for calls, not just `getBlock`. [Helpful overview](https://docs.chainstack.com/docs/solana-optimize-your-getblock-performance#json-jsonparsed-base58-base64).| ✅ | | **Stage 2: Bonding curve and migration management** | `logsSubscribe` integration | Integrate `logsSubscribe` instead of `blockSubscribe` for sniping minted tokens into the main bot | ✅ | diff --git a/bots/bot-sniper-1-geyser.yaml b/bots/bot-sniper-1-geyser.yaml index ac387d3..0772916 100644 --- a/bots/bot-sniper-1-geyser.yaml +++ b/bots/bot-sniper-1-geyser.yaml @@ -92,6 +92,6 @@ cleanup: force_close_with_burn: false # Force burning remaining tokens before closing account with_priority_fee: false # Use priority fees for cleanup transactions -# Node provider configuration (not implemented) +# Node provider configuration node: max_rps: 25 # Maximum requests per second diff --git a/bots/bot-sniper-2-logs.yaml b/bots/bot-sniper-2-logs.yaml index 3b50b47..7f4c0b6 100644 --- a/bots/bot-sniper-2-logs.yaml +++ b/bots/bot-sniper-2-logs.yaml @@ -92,6 +92,6 @@ cleanup: force_close_with_burn: false # Force burning remaining tokens before closing account with_priority_fee: false # Use priority fees for cleanup transactions -# Node provider configuration (not implemented) +# Node provider configuration node: max_rps: 25 # Maximum requests per second diff --git a/bots/bot-sniper-3-blocks.yaml b/bots/bot-sniper-3-blocks.yaml index 3701122..96e4525 100644 --- a/bots/bot-sniper-3-blocks.yaml +++ b/bots/bot-sniper-3-blocks.yaml @@ -92,6 +92,6 @@ cleanup: force_close_with_burn: false # Force burning remaining tokens before closing account with_priority_fee: false # Use priority fees for cleanup transactions -# Node provider configuration (not implemented) +# Node provider configuration node: max_rps: 25 # Maximum requests per second diff --git a/bots/bot-sniper-4-pp.yaml b/bots/bot-sniper-4-pp.yaml index 23af58f..cc1f6d8 100644 --- a/bots/bot-sniper-4-pp.yaml +++ b/bots/bot-sniper-4-pp.yaml @@ -90,6 +90,6 @@ cleanup: force_close_with_burn: false # Force burning remaining tokens before closing account with_priority_fee: false # Use priority fees for cleanup transactions -# Node provider configuration (not implemented) +# Node provider configuration node: max_rps: 25 # Maximum requests per second diff --git a/learning-examples/cleanup_accounts.py b/learning-examples/cleanup_accounts.py index 83bb8c3..2189457 100644 --- a/learning-examples/cleanup_accounts.py +++ b/learning-examples/cleanup_accounts.py @@ -29,13 +29,16 @@ async def close_account_if_exists( ): """Safely close a token account if it exists and reclaim rent.""" try: - solana_client = await client.get_client() - info = await solana_client.get_account_info( - account, encoding="base64" - ) # base64 encoding for account data by deafult + try: + await client.get_account_info(account) + except ValueError: + logger.info(f"Account does not exist or already closed: {account}") + return # WARNING: This will permanently burn all tokens in the account before closing it # Closing account is impossible if balance is positive + # Burn + close are combined into a single transaction to avoid race conditions + instructions = [] balance = await client.get_token_account_balance(account) if balance > 0: logger.info(f"Burning {balance} tokens from account {account}...") @@ -48,29 +51,26 @@ async def close_account_if_exists( program_id=TOKEN_PROGRAM, ) ) - await client.build_and_send_transaction([burn_ix], wallet.keypair) - logger.info(f"Burned tokens from {account}") - - # If account exists, attempt to close it - if info.value: - logger.info(f"Closing account: {account}") - close_params = CloseAccountParams( - account=account, - dest=wallet.pubkey, - owner=wallet.pubkey, - program_id=TOKEN_PROGRAM, - ) - ix = close_account(close_params) - - tx_sig = await client.build_and_send_transaction( - [ix], - wallet.keypair, - skip_preflight=True, - ) - await client.confirm_transaction(tx_sig) - logger.info(f"Closed successfully: {account}") - else: - logger.info(f"Account does not exist or already closed: {account}") + instructions.append(burn_ix) + + # Account exists, attempt to close it + logger.info(f"Closing account: {account}") + close_params = CloseAccountParams( + account=account, + dest=wallet.pubkey, + owner=wallet.pubkey, + program_id=TOKEN_PROGRAM, + ) + instructions.append(close_account(close_params)) + + tx_sig = await client.build_and_send_transaction( + instructions, + wallet.keypair, + skip_preflight=True, + ) + await client.confirm_transaction(tx_sig) + action = "Burned and closed" if balance > 0 else "Closed" + logger.info(f"{action} successfully: {account}") except Exception as e: logger.error(f"Error while processing account {account}: {e}") diff --git a/src/bot_runner.py b/src/bot_runner.py index 4a47e52..c6cd4ae 100644 --- a/src/bot_runner.py +++ b/src/bot_runner.py @@ -137,6 +137,8 @@ async def start_bot(config_path: str): yolo_mode=cfg["filters"].get("yolo_mode", False), # Compute unit configuration compute_units=cfg.get("compute_units", {}), + # Node provider configuration + max_rps=cfg.get("node", {}).get("max_rps", 25), ) await trader.start() diff --git a/src/cleanup/manager.py b/src/cleanup/manager.py index 0da3f17..b545f56 100644 --- a/src/cleanup/manager.py +++ b/src/cleanup/manager.py @@ -34,7 +34,9 @@ def __init__( self.use_priority_fee = use_priority_fee self.close_with_force_burn = force_burn - async def cleanup_ata(self, mint: Pubkey, token_program_id: Pubkey | None = None) -> None: + async def cleanup_ata( + self, mint: Pubkey, token_program_id: Pubkey | None = None + ) -> None: """ Attempt to burn any remaining tokens and close the ATA. Skips if account doesn't exist or is already empty/closed. @@ -47,7 +49,6 @@ async def cleanup_ata(self, mint: Pubkey, token_program_id: Pubkey | None = None token_program_id = SystemAddresses.TOKEN_2022_PROGRAM ata = self.wallet.get_associated_token_address(mint, token_program_id) - solana_client = await self.client.get_client() priority_fee = ( await self.priority_fee_manager.calculate_priority_fee([ata]) @@ -59,8 +60,9 @@ async def cleanup_ata(self, mint: Pubkey, token_program_id: Pubkey | None = None await asyncio.sleep(15) try: - info = await solana_client.get_account_info(ata, encoding="base64") - if not info.value: + try: + await self.client.get_account_info(ata) + except ValueError: logger.info(f"ATA {ata} does not exist or already closed.") return diff --git a/src/core/client.py b/src/core/client.py index 45d6a32..526f735 100644 --- a/src/core/client.py +++ b/src/core/client.py @@ -3,7 +3,7 @@ """ import asyncio -import json +import random import struct from typing import Any @@ -19,10 +19,13 @@ from solders.pubkey import Pubkey from solders.transaction import Transaction +from core.rpc_rate_limiter import TokenBucketRateLimiter from utils.logger import get_logger logger = get_logger(__name__) +HTTP_TOO_MANY_REQUESTS = 429 + def set_loaded_accounts_data_size_limit(bytes_limit: int) -> Instruction: """ @@ -56,11 +59,12 @@ def set_loaded_accounts_data_size_limit(bytes_limit: int) -> Instruction: class SolanaClient: """Abstraction for Solana RPC client operations.""" - def __init__(self, rpc_endpoint: str): + def __init__(self, rpc_endpoint: str, max_rps: float = 25.0): """Initialize Solana client with RPC endpoint. Args: rpc_endpoint: URL of the Solana RPC endpoint + max_rps: Maximum RPC requests per second (rate limiter) """ self.rpc_endpoint = rpc_endpoint self._client = None @@ -69,6 +73,9 @@ def __init__(self, rpc_endpoint: str): self._blockhash_updater_task = asyncio.create_task( self.start_blockhash_updater() ) + self._rate_limiter = TokenBucketRateLimiter(max_rps=max_rps) + self._session: aiohttp.ClientSession | None = None + self._session_lock = asyncio.Lock() async def start_blockhash_updater(self, interval: float = 5.0): """Start background task to update recent blockhash.""" @@ -99,6 +106,19 @@ async def get_client(self) -> AsyncClient: self._client = AsyncClient(self.rpc_endpoint) return self._client + async def _get_session(self) -> aiohttp.ClientSession: + """Get or create the shared aiohttp session. + + Returns: + Shared aiohttp.ClientSession instance. + """ + async with self._session_lock: + if self._session is None or self._session.closed: + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=10), + ) + return self._session + async def close(self): """Close the client connection and stop the blockhash updater.""" if self._blockhash_updater_task: @@ -112,6 +132,10 @@ async def close(self): await self._client.close() self._client = None + if self._session and not self._session.closed: + await self._session.close() + self._session = None + async def get_health(self) -> str | None: body = { "jsonrpc": "2.0", @@ -135,6 +159,7 @@ async def get_account_info(self, pubkey: Pubkey) -> dict[str, Any]: Raises: ValueError: If account doesn't exist or has no data """ + await self._rate_limiter.acquire() client = await self.get_client() response = await client.get_account_info( pubkey, encoding="base64" @@ -152,6 +177,7 @@ async def get_token_account_balance(self, token_account: Pubkey) -> int: Returns: Token balance as integer """ + await self._rate_limiter.acquire() client = await self.get_client() response = await client.get_token_account_balance(token_account) if response.value: @@ -164,6 +190,7 @@ async def get_latest_blockhash(self) -> Hash: Returns: Recent blockhash as string """ + await self._rate_limiter.acquire() client = await self.get_client() response = await client.get_latest_blockhash(commitment="processed") return response.value.blockhash @@ -230,6 +257,7 @@ async def build_and_send_transaction( for attempt in range(max_retries): try: + await self._rate_limiter.acquire() tx_opts = TxOpts( skip_preflight=skip_preflight, preflight_commitment=Processed ) @@ -261,6 +289,7 @@ async def confirm_transaction( Returns: Whether transaction was confirmed """ + await self._rate_limiter.acquire() client = await self.get_client() try: await client.confirm_transaction( @@ -417,28 +446,79 @@ async def _get_transaction_result(self, signature: str) -> dict | None: return result - async def post_rpc(self, body: dict[str, Any]) -> dict[str, Any] | None: - """ - Send a raw RPC request to the Solana node. + async def post_rpc( + self, body: dict[str, Any], max_retries: int = 3, max_429_retries: int = 10 + ) -> dict[str, Any] | None: + """Send a raw RPC request with rate limiting, retry, and 429 handling. Args: body: JSON-RPC request body. + max_retries: Maximum number of retry attempts for errors. + max_429_retries: Maximum number of retry attempts for 429 rate limits. Returns: - Optional[Dict[str, Any]]: Parsed JSON response, or None if the request fails. + Parsed JSON response, or None if all attempts fail. """ - try: - async with aiohttp.ClientSession() as session: + method = body.get("method", "unknown") + error_attempts = 0 + rate_limit_attempts = 0 + + while error_attempts < max_retries: + try: + await self._rate_limiter.acquire() + session = await self._get_session() + async with session.post( self.rpc_endpoint, json=body, - timeout=aiohttp.ClientTimeout(10), # 10-second timeout ) as response: + if response.status == HTTP_TOO_MANY_REQUESTS: + rate_limit_attempts += 1 + if rate_limit_attempts >= max_429_retries: + logger.error( + f"RPC rate limited (429) on {method}, " + f"exhausted {max_429_retries} rate-limit retries" + ) + return None + retry_after = response.headers.get("Retry-After") + try: + wait_time = float(retry_after) if retry_after else None + except (ValueError, TypeError): + wait_time = None + if wait_time is None: + wait_time = min(2**rate_limit_attempts, 30) + jitter = wait_time * random.uniform(0, 0.25) # noqa: S311 + total_wait = wait_time + jitter + logger.warning( + f"RPC rate limited (429) on {method}, " + f"429 retry {rate_limit_attempts}/{max_429_retries}, " + f"waiting {total_wait:.1f}s" + ) + await asyncio.sleep(total_wait) + continue + response.raise_for_status() return await response.json() - except aiohttp.ClientError: - logger.exception("RPC request failed") - return None - except json.JSONDecodeError: - logger.exception("Failed to decode RPC response") - return None + + except aiohttp.ContentTypeError: + logger.exception(f"Failed to decode RPC response for {method}") + return None + + except aiohttp.ClientError: + error_attempts += 1 + if error_attempts >= max_retries: + logger.exception( + f"RPC request {method} failed after {max_retries} attempts" + ) + return None + + wait_time = min(2 ** (error_attempts - 1), 16) + jitter = wait_time * random.uniform(0, 0.25) # noqa: S311 + logger.warning( + f"RPC request {method} failed " + f"(attempt {error_attempts}/{max_retries}), " + f"retrying in {wait_time + jitter:.1f}s" + ) + await asyncio.sleep(wait_time + jitter) + + return None diff --git a/src/core/rpc_rate_limiter.py b/src/core/rpc_rate_limiter.py new file mode 100644 index 0000000..af96146 --- /dev/null +++ b/src/core/rpc_rate_limiter.py @@ -0,0 +1,59 @@ +"""Token bucket rate limiter for Solana RPC requests.""" + +import asyncio +import math +import time + +from utils.logger import get_logger + +logger = get_logger(__name__) + + +class TokenBucketRateLimiter: + """Token bucket rate limiter for controlling RPC request rate. + + Implements a token bucket algorithm that replenishes tokens at a + fixed rate. Each RPC call consumes one token. When the bucket is + empty, callers wait until a token becomes available. + + Args: + max_rps: Maximum requests per second (bucket refill rate). Must be positive. + burst_size: Maximum burst size (bucket capacity). Defaults to max_rps. + """ + + def __init__(self, max_rps: float, burst_size: int | None = None) -> None: + if max_rps <= 0: + msg = f"max_rps must be positive, got {max_rps}" + raise ValueError(msg) + self._max_rps = max_rps + self._burst_size = ( + burst_size if burst_size is not None else max(1, math.ceil(max_rps)) + ) + if self._burst_size <= 0: + msg = f"burst_size must be positive, got {burst_size}" + raise ValueError(msg) + self._tokens = float(self._burst_size) + self._last_refill = time.monotonic() + self._lock = asyncio.Lock() + + async def acquire(self) -> None: + """Acquire a token, waiting if the bucket is empty.""" + while True: + async with self._lock: + self._refill() + if self._tokens >= 1.0: + self._tokens -= 1.0 + return + wait_time = (1.0 - self._tokens) / self._max_rps + + await asyncio.sleep(wait_time) + + def _refill(self) -> None: + """Refill tokens based on elapsed time since last refill.""" + now = time.monotonic() + elapsed = now - self._last_refill + self._tokens = min( + self._burst_size, + self._tokens + elapsed * self._max_rps, + ) + self._last_refill = now diff --git a/src/trading/universal_trader.py b/src/trading/universal_trader.py index 6f4350c..e3e5c4f 100644 --- a/src/trading/universal_trader.py +++ b/src/trading/universal_trader.py @@ -85,10 +85,12 @@ def __init__( yolo_mode: bool = False, # Compute unit configuration compute_units: dict | None = None, + # Node provider configuration + max_rps: float = 25.0, ): """Initialize the universal trader.""" # Core components - self.solana_client = SolanaClient(rpc_endpoint) + self.solana_client = SolanaClient(rpc_endpoint, max_rps=max_rps) self.wallet = Wallet(private_key) self.priority_fee_manager = PriorityFeeManager( client=self.solana_client,