Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bots/bot-sniper-1-geyser.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,6 @@ cleanup:
force_close_with_burn: false # Force burning remaining tokens before closing account
with_priority_fee: false # Use priority fees for cleanup transactions

# Node provider configuration (not implemented)
# Node provider configuration
node:
max_rps: 25 # Maximum requests per second
2 changes: 1 addition & 1 deletion bots/bot-sniper-2-logs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,6 @@ cleanup:
force_close_with_burn: false # Force burning remaining tokens before closing account
with_priority_fee: false # Use priority fees for cleanup transactions

# Node provider configuration (not implemented)
# Node provider configuration
node:
max_rps: 25 # Maximum requests per second
2 changes: 1 addition & 1 deletion bots/bot-sniper-3-blocks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,6 @@ cleanup:
force_close_with_burn: false # Force burning remaining tokens before closing account
with_priority_fee: false # Use priority fees for cleanup transactions

# Node provider configuration (not implemented)
# Node provider configuration
node:
max_rps: 25 # Maximum requests per second
2 changes: 1 addition & 1 deletion bots/bot-sniper-4-pp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,6 @@ cleanup:
force_close_with_burn: false # Force burning remaining tokens before closing account
with_priority_fee: false # Use priority fees for cleanup transactions

# Node provider configuration (not implemented)
# Node provider configuration
node:
max_rps: 25 # Maximum requests per second
46 changes: 22 additions & 24 deletions learning-examples/cleanup_accounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ async def close_account_if_exists(
):
"""Safely close a token account if it exists and reclaim rent."""
try:
solana_client = await client.get_client()
info = await solana_client.get_account_info(
account, encoding="base64"
) # base64 encoding for account data by deafult
try:
await client.get_account_info(account)
except ValueError:
logger.info(f"Account does not exist or already closed: {account}")
return

# WARNING: This will permanently burn all tokens in the account before closing it
# Closing account is impossible if balance is positive
Expand All @@ -51,26 +52,23 @@ async def close_account_if_exists(
await client.build_and_send_transaction([burn_ix], wallet.keypair)
logger.info(f"Burned tokens from {account}")

# If account exists, attempt to close it
if info.value:
logger.info(f"Closing account: {account}")
close_params = CloseAccountParams(
account=account,
dest=wallet.pubkey,
owner=wallet.pubkey,
program_id=TOKEN_PROGRAM,
)
ix = close_account(close_params)

tx_sig = await client.build_and_send_transaction(
[ix],
wallet.keypair,
skip_preflight=True,
)
await client.confirm_transaction(tx_sig)
logger.info(f"Closed successfully: {account}")
else:
logger.info(f"Account does not exist or already closed: {account}")
# Account exists, attempt to close it
logger.info(f"Closing account: {account}")
close_params = CloseAccountParams(
account=account,
dest=wallet.pubkey,
owner=wallet.pubkey,
program_id=TOKEN_PROGRAM,
)
ix = close_account(close_params)

tx_sig = await client.build_and_send_transaction(
[ix],
wallet.keypair,
skip_preflight=True,
)
await client.confirm_transaction(tx_sig)
logger.info(f"Closed successfully: {account}")

except Exception as e:
logger.error(f"Error while processing account {account}: {e}")
Expand Down
2 changes: 2 additions & 0 deletions src/bot_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ async def start_bot(config_path: str):
yolo_mode=cfg["filters"].get("yolo_mode", False),
# Compute unit configuration
compute_units=cfg.get("compute_units", {}),
# Node provider configuration
max_rps=cfg.get("node", {}).get("max_rps", 25),
)

await trader.start()
Expand Down
10 changes: 6 additions & 4 deletions src/cleanup/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ def __init__(
self.use_priority_fee = use_priority_fee
self.close_with_force_burn = force_burn

async def cleanup_ata(self, mint: Pubkey, token_program_id: Pubkey | None = None) -> None:
async def cleanup_ata(
self, mint: Pubkey, token_program_id: Pubkey | None = None
) -> None:
"""
Attempt to burn any remaining tokens and close the ATA.
Skips if account doesn't exist or is already empty/closed.
Expand All @@ -47,7 +49,6 @@ async def cleanup_ata(self, mint: Pubkey, token_program_id: Pubkey | None = None
token_program_id = SystemAddresses.TOKEN_2022_PROGRAM

ata = self.wallet.get_associated_token_address(mint, token_program_id)
solana_client = await self.client.get_client()

priority_fee = (
await self.priority_fee_manager.calculate_priority_fee([ata])
Expand All @@ -59,8 +60,9 @@ async def cleanup_ata(self, mint: Pubkey, token_program_id: Pubkey | None = None
await asyncio.sleep(15)

try:
info = await solana_client.get_account_info(ata, encoding="base64")
if not info.value:
try:
await self.client.get_account_info(ata)
except ValueError:
logger.info(f"ATA {ata} does not exist or already closed.")
return

Expand Down
94 changes: 80 additions & 14 deletions src/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import asyncio
import json
import random
import struct
from typing import Any

Expand All @@ -19,10 +20,13 @@
from solders.pubkey import Pubkey
from solders.transaction import Transaction

from core.rpc_rate_limiter import TokenBucketRateLimiter
from utils.logger import get_logger

logger = get_logger(__name__)

HTTP_TOO_MANY_REQUESTS = 429


def set_loaded_accounts_data_size_limit(bytes_limit: int) -> Instruction:
"""
Expand Down Expand Up @@ -56,11 +60,12 @@ def set_loaded_accounts_data_size_limit(bytes_limit: int) -> Instruction:
class SolanaClient:
"""Abstraction for Solana RPC client operations."""

def __init__(self, rpc_endpoint: str):
def __init__(self, rpc_endpoint: str, max_rps: float = 25.0):
"""Initialize Solana client with RPC endpoint.

Args:
rpc_endpoint: URL of the Solana RPC endpoint
max_rps: Maximum RPC requests per second (rate limiter)
"""
self.rpc_endpoint = rpc_endpoint
self._client = None
Expand All @@ -69,6 +74,8 @@ def __init__(self, rpc_endpoint: str):
self._blockhash_updater_task = asyncio.create_task(
self.start_blockhash_updater()
)
self._rate_limiter = TokenBucketRateLimiter(max_rps=max_rps)
self._session: aiohttp.ClientSession | None = None

async def start_blockhash_updater(self, interval: float = 5.0):
"""Start background task to update recent blockhash."""
Expand Down Expand Up @@ -99,6 +106,18 @@ async def get_client(self) -> AsyncClient:
self._client = AsyncClient(self.rpc_endpoint)
return self._client

async def _get_session(self) -> aiohttp.ClientSession:
"""Get or create the shared aiohttp session.

Returns:
Shared aiohttp.ClientSession instance.
"""
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=10),
)
return self._session

async def close(self):
"""Close the client connection and stop the blockhash updater."""
if self._blockhash_updater_task:
Expand All @@ -112,6 +131,10 @@ async def close(self):
await self._client.close()
self._client = None

if self._session and not self._session.closed:
await self._session.close()
self._session = None

async def get_health(self) -> str | None:
body = {
"jsonrpc": "2.0",
Expand All @@ -135,6 +158,7 @@ async def get_account_info(self, pubkey: Pubkey) -> dict[str, Any]:
Raises:
ValueError: If account doesn't exist or has no data
"""
await self._rate_limiter.acquire()
client = await self.get_client()
response = await client.get_account_info(
pubkey, encoding="base64"
Expand All @@ -152,6 +176,7 @@ async def get_token_account_balance(self, token_account: Pubkey) -> int:
Returns:
Token balance as integer
"""
await self._rate_limiter.acquire()
client = await self.get_client()
response = await client.get_token_account_balance(token_account)
if response.value:
Expand All @@ -164,6 +189,7 @@ async def get_latest_blockhash(self) -> Hash:
Returns:
Recent blockhash as string
"""
await self._rate_limiter.acquire()
client = await self.get_client()
response = await client.get_latest_blockhash(commitment="processed")
return response.value.blockhash
Expand Down Expand Up @@ -230,6 +256,7 @@ async def build_and_send_transaction(

for attempt in range(max_retries):
try:
await self._rate_limiter.acquire()
tx_opts = TxOpts(
skip_preflight=skip_preflight, preflight_commitment=Processed
)
Expand Down Expand Up @@ -261,6 +288,7 @@ async def confirm_transaction(
Returns:
Whether transaction was confirmed
"""
await self._rate_limiter.acquire()
client = await self.get_client()
try:
await client.confirm_transaction(
Expand Down Expand Up @@ -417,28 +445,66 @@ async def _get_transaction_result(self, signature: str) -> dict | None:

return result

async def post_rpc(self, body: dict[str, Any]) -> dict[str, Any] | None:
"""
Send a raw RPC request to the Solana node.
async def post_rpc(
self, body: dict[str, Any], max_retries: int = 3
) -> dict[str, Any] | None:
"""Send a raw RPC request with rate limiting, retry, and 429 handling.

Args:
body: JSON-RPC request body.
max_retries: Maximum number of retry attempts.

Returns:
Optional[Dict[str, Any]]: Parsed JSON response, or None if the request fails.
Parsed JSON response, or None if all attempts fail.
"""
try:
async with aiohttp.ClientSession() as session:
method = body.get("method", "unknown")
session = await self._get_session()

for attempt in range(max_retries):
try:
await self._rate_limiter.acquire()

async with session.post(
self.rpc_endpoint,
json=body,
timeout=aiohttp.ClientTimeout(10), # 10-second timeout
) as response:
if response.status == HTTP_TOO_MANY_REQUESTS:
retry_after = response.headers.get("Retry-After")
if retry_after:
wait_time = float(retry_after)
else:
wait_time = min(2 ** (attempt + 1), 30)
jitter = wait_time * random.uniform(0, 0.25) # noqa: S311
total_wait = wait_time + jitter
logger.warning(
f"RPC rate limited (429) on {method}, "
f"attempt {attempt + 1}/{max_retries}, "
f"waiting {total_wait:.1f}s"
)
await asyncio.sleep(total_wait)
continue

response.raise_for_status()
return await response.json()
except aiohttp.ClientError:
logger.exception("RPC request failed")
return None
except json.JSONDecodeError:
logger.exception("Failed to decode RPC response")
return None

except aiohttp.ClientError:
if attempt == max_retries - 1:
logger.exception(
f"RPC request {method} failed after {max_retries} attempts"
)
return None

wait_time = min(2**attempt, 16)
jitter = wait_time * random.uniform(0, 0.25) # noqa: S311
logger.warning(
f"RPC request {method} failed "
f"(attempt {attempt + 1}/{max_retries}), "
f"retrying in {wait_time + jitter:.1f}s"
)
await asyncio.sleep(wait_time + jitter)

except json.JSONDecodeError:
logger.exception(f"Failed to decode RPC response for {method}")
return None

return None
50 changes: 50 additions & 0 deletions src/core/rpc_rate_limiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Token bucket rate limiter for Solana RPC requests."""

import asyncio
import time

from utils.logger import get_logger

logger = get_logger(__name__)


class TokenBucketRateLimiter:
"""Token bucket rate limiter for controlling RPC request rate.

Implements a token bucket algorithm that replenishes tokens at a
fixed rate. Each RPC call consumes one token. When the bucket is
empty, callers wait until a token becomes available.

Args:
max_rps: Maximum requests per second (bucket refill rate).
burst_size: Maximum burst size (bucket capacity). Defaults to max_rps.
"""

def __init__(self, max_rps: float, burst_size: int | None = None) -> None:
self._max_rps = max_rps
self._burst_size = burst_size if burst_size is not None else int(max_rps)
self._tokens = float(self._burst_size)
self._last_refill = time.monotonic()
self._lock = asyncio.Lock()
Comment on lines +24 to +37
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Line 28 exceeds the 88-character limit; also validate explicit burst_size.

Two items in __init__:

  1. Line length: Line 28 is ~96 characters, exceeding the 88-char Ruff/project limit.
  2. Missing burst_size guard: If a caller explicitly passes burst_size=0 (or negative), _tokens starts at 0, _refill caps at 0, and acquire() loops forever — same class of bug that was just fixed for max_rps. Since burst_size can be supplied externally, a quick guard keeps the contract tight.
  3. TRY003 (Ruff hint, line 26): The f-string message in ValueError is flagged by Ruff's TRY003 rule. A minor nit — defining a custom message constant or inlining a short message would silence it.
Proposed fix
     def __init__(self, max_rps: float, burst_size: int | None = None) -> None:
         if max_rps <= 0:
-            raise ValueError(f"max_rps must be positive, got {max_rps}")
+            msg = f"max_rps must be positive, got {max_rps}"
+            raise ValueError(msg)
         self._max_rps = max_rps
-        self._burst_size = burst_size if burst_size is not None else max(1, math.ceil(max_rps))
+        self._burst_size = (
+            burst_size if burst_size is not None
+            else max(1, math.ceil(max_rps))
+        )
+        if self._burst_size <= 0:
+            msg = f"burst_size must be positive, got {burst_size}"
+            raise ValueError(msg)
         self._tokens = float(self._burst_size)
         self._last_refill = time.monotonic()
         self._lock = asyncio.Lock()

As per coding guidelines: "Limit lines to 88 characters", "Perform comprehensive input validation for externally sourced data", and "Comply with Ruff exception handling rules (BLE, TRY)".

🧰 Tools
🪛 Ruff (0.14.14)

[warning] 26-26: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In `@src/core/rpc_rate_limiter.py` around lines 24 - 31, The constructor __init__
currently has an overlong line and doesn't validate an explicit burst_size;
update it to enforce burst_size > 0 (raise ValueError with a short non-f-string
message if burst_size <= 0), keep the default behavior burst_size = max(1,
math.ceil(max_rps)) when None, initialize _tokens from the validated
_burst_size, and ensure any long expressions are split or assigned to
temporaries so no line exceeds the 88-character limit; refer to symbols
__init__, _burst_size, _tokens, and _last_refill when making the changes.


async def acquire(self) -> None:
"""Acquire a token, waiting if the bucket is empty."""
while True:
async with self._lock:
self._refill()
if self._tokens >= 1.0:
self._tokens -= 1.0
return
wait_time = (1.0 - self._tokens) / self._max_rps

await asyncio.sleep(wait_time)

def _refill(self) -> None:
"""Refill tokens based on elapsed time since last refill."""
now = time.monotonic()
elapsed = now - self._last_refill
self._tokens = min(
self._burst_size,
self._tokens + elapsed * self._max_rps,
)
self._last_refill = now
4 changes: 3 additions & 1 deletion src/trading/universal_trader.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ def __init__(
yolo_mode: bool = False,
# Compute unit configuration
compute_units: dict | None = None,
# Node provider configuration
max_rps: float = 25.0,
):
"""Initialize the universal trader."""
# Core components
self.solana_client = SolanaClient(rpc_endpoint)
self.solana_client = SolanaClient(rpc_endpoint, max_rps=max_rps)
self.wallet = Wallet(private_key)
self.priority_fee_manager = PriorityFeeManager(
client=self.solana_client,
Expand Down