diff --git a/src/daemon.py b/src/daemon.py index 533115a..05d6be4 100644 --- a/src/daemon.py +++ b/src/daemon.py @@ -8,31 +8,18 @@ def main() -> None: # valid chain names: mainnet, xdai, arbitrum_one chain_name = os.getenv("CHAIN_NAME") + process_imbalances = os.getenv("PROCESS_IMBALANCES", "True") == "True" + process_prices = os.getenv("PROCESS_PRICES", "False") == "True" + process_fees = os.getenv("PROCESS_FEES", "False") == "True" + if chain_name is None: logger.error("CHAIN_NAME environment variable is not set.") return - process_imbalances = True - process_fees = False - process_prices = True - - web3, db_engine = initialize_connections() - blockchain = BlockchainData(web3) - db = Database(db_engine, chain_name) - - if chain_name == "arbitrum_one": - process_imbalances = False - process_prices = False - - if chain_name == "xdai": - process_prices = False - processor = TransactionProcessor( - blockchain, db, chain_name, process_imbalances, process_fees, process_prices + chain_name, process_imbalances, process_fees, process_prices ) - - start_block = processor.get_start_block() - processor.process(start_block) + processor.run() if __name__ == "__main__": diff --git a/src/imbalances_script.py b/src/raw_imbalances.py similarity index 90% rename from src/imbalances_script.py rename to src/raw_imbalances.py index baf276a..444ee3e 100644 --- a/src/imbalances_script.py +++ b/src/raw_imbalances.py @@ -121,21 +121,28 @@ def extract_actions(self, traces: list[AttributeDict], address: str) -> list[dic actions.append(dict(action)) return actions - def calculate_native_eth_imbalance(self, actions: list[dict], address: str) -> int: + def calculate_native_eth_imbalance( + self, actions: list[dict], address: str + ) -> int | None: """Extract ETH imbalance from transfer actions.""" # inflow is the total value transferred to address param - inflow = sum( - _to_int(action["value"]) - for action in actions - if Web3.to_checksum_address(action.get("to", "")) == address - ) - # outflow is the total value transferred out of address param - outflow = sum( - _to_int(action["value"]) - for action in actions - if Web3.to_checksum_address(action.get("from", "")) == address - ) - return inflow - outflow + native_eth_imbalance = 0 + is_it_none = True + for action in actions: + flow = 0 + if Web3.to_checksum_address(action.get("to", "")) == address: + flow = _to_int(action["value"]) + else: + if Web3.to_checksum_address(action.get("from", "")) == address: + flow = (-1) * _to_int(action["value"]) + if flow != 0: + is_it_none = False + native_eth_imbalance += flow + + if is_it_none: + return None + else: + return native_eth_imbalance def extract_events(self, tx_receipt: dict) -> dict[str, list[dict]]: """Extract relevant events from the transaction receipt.""" @@ -297,7 +304,7 @@ def filter_sdai_events(event_list: list[dict], is_deposit: bool) -> None: filter_sdai_events(events["DepositSDAI"], is_deposit=True) filter_sdai_events(events["WithdrawSDAI"], is_deposit=False) - def compute_imbalances(self, tx_hash: str) -> dict[str, int]: + def aggregate_imbalances(self, tx_hash: str) -> dict[str, int]: try: tx_receipt = self.get_transaction_receipt(tx_hash) if not tx_receipt: @@ -332,6 +339,21 @@ def compute_imbalances(self, tx_hash: str) -> dict[str, int]: logger.error("Error computing imbalances for %s: %s", tx_hash, e) raise + def compute_token_imbalances(self, tx_hash: str) -> dict[str, int]: + """Process token imbalances for a given transaction and return imbalances.""" + try: + return self.aggregate_imbalances(tx_hash) + except Exception as e: + logger.error(f"Failed to compute imbalances for transaction {tx_hash}: {e}") + return {} + + def get_transaction_tokens(self, tx_hash: str) -> list[tuple[str, str]]: + token_imbalances = self.compute_token_imbalances(tx_hash) + transaction_tokens = [] + for token_address in token_imbalances.keys(): + transaction_tokens.append((tx_hash, token_address)) + return transaction_tokens + def main() -> None: """main function for finding imbalance for a single tx hash.""" @@ -339,7 +361,7 @@ def main() -> None: chain_name, web3 = find_chain_with_tx(tx_hash) rt = RawTokenImbalances(web3, chain_name) try: - imbalances = rt.compute_imbalances(tx_hash) + imbalances = rt.aggregate_imbalances(tx_hash) if imbalances: logger.info(f"Token Imbalances on {chain_name}:") for token_address, imbalance in imbalances.items(): diff --git a/src/test_single_hash.py b/src/test_single_hash.py index 3996915..77d01f4 100644 --- a/src/test_single_hash.py +++ b/src/test_single_hash.py @@ -1,6 +1,6 @@ from hexbytes import HexBytes from web3 import Web3 -from src.imbalances_script import RawTokenImbalances +from src.raw_imbalances import RawTokenImbalances from src.price_providers.price_feed import PriceFeed from src.fees.compute_fees import compute_all_fees_of_batch from src.transaction_processor import calculate_slippage @@ -25,7 +25,7 @@ def __init__(self): self.price_providers = PriceFeed() def compute_data(self, tx_hash: str): - token_imbalances = self.imbalances.compute_imbalances(tx_hash) + token_imbalances = self.imbalances.compute_token_imbalances(tx_hash) protocol_fees, partner_fees, network_fees = compute_all_fees_of_batch( HexBytes(tx_hash) ) diff --git a/src/transaction_processor.py b/src/transaction_processor.py index af42ace..0ae12fe 100644 --- a/src/transaction_processor.py +++ b/src/transaction_processor.py @@ -5,10 +5,10 @@ from src.fees.compute_fees import compute_all_fees_of_batch from src.helpers.blockchain_data import BlockchainData -from src.helpers.config import CHAIN_SLEEP_TIME, logger +from src.helpers.config import CHAIN_SLEEP_TIME, logger, initialize_connections from src.helpers.database import Database -from src.helpers.helper_functions import read_sql_file, set_params -from src.imbalances_script import RawTokenImbalances +from src.helpers.helper_functions import set_params +from src.raw_imbalances import RawTokenImbalances from src.price_providers.price_feed import PriceFeed from src.token_decimals import update_token_decimals @@ -22,23 +22,58 @@ class TransactionProcessor: def __init__( self, - blockchain_data: BlockchainData, - db: Database, chain_name: str, process_imbalances: bool, process_fees: bool, process_prices: bool, ): - self.blockchain_data = blockchain_data - self.db = db self.chain_name = chain_name self.process_imbalances = process_imbalances self.process_fees = process_fees self.process_prices = process_prices + web3, db_engine = initialize_connections() + self.blockchain_data = BlockchainData(web3) + self.db = Database(db_engine, chain_name) self.imbalances = RawTokenImbalances(self.blockchain_data.web3, self.chain_name) self.price_providers = PriceFeed(activate=process_prices) - self.log_message: list[str] = [] + + ###################### MAIN RUN LOOP ###################### + def run(self) -> None: + """Main Daemon loop that processes txs and computes imbalances, + relevant prices and fees, if needed.""" + + start_block = self.get_start_block() + previous_block = start_block + unprocessed_txs: list[tuple[str, int, int]] = [] + logger.info("%s daemon started. Start block: %d", self.chain_name, start_block) + + while True: + try: + latest_block = self.blockchain_data.get_latest_block() + new_txs = self.blockchain_data.fetch_tx_data( + previous_block, latest_block + ) + all_txs = new_txs + unprocessed_txs + unprocessed_txs.clear() + + for tx_hash, auction_id, block_number in all_txs: + try: + self.process_single_transaction( + tx_hash, auction_id, block_number + ) + except Exception as e: + unprocessed_txs.append((tx_hash, auction_id, block_number)) + logger.error(f"Error processing transaction {tx_hash}: {e}") + + previous_block = latest_block + 1 + time.sleep(CHAIN_SLEEP_TIME) + + except Exception as e: + logger.error(f"Error in processing loop: {e}") + time.sleep(CHAIN_SLEEP_TIME) + + ########################################################### def get_start_block(self) -> int: """ @@ -81,158 +116,48 @@ def get_start_block(self) -> int: return start_block - def process(self, start_block: int) -> None: - """Main Daemon loop that finds imbalances for txs and prices.""" - previous_block = start_block - unprocessed_txs: list[tuple[str, int, int]] = [] - logger.info("%s daemon started. Start block: %d", self.chain_name, start_block) - - while True: - try: - latest_block = self.blockchain_data.get_latest_block() - new_txs = self.blockchain_data.fetch_tx_data( - previous_block, latest_block - ) - all_txs = new_txs + unprocessed_txs - unprocessed_txs.clear() - - for tx_hash, auction_id, block_number in all_txs: - try: - self.process_single_transaction( - tx_hash, auction_id, block_number - ) - except Exception as e: - unprocessed_txs.append((tx_hash, auction_id, block_number)) - logger.error(f"Error processing transaction {tx_hash}: {e}") - - previous_block = latest_block + 1 - time.sleep(CHAIN_SLEEP_TIME) - - except Exception as e: - logger.error(f"Error in processing loop: {e}") - time.sleep(CHAIN_SLEEP_TIME) - def process_single_transaction( - self, tx_hash: str, auction_id: int, block_number: int + self, + tx_hash: str, + auction_id: int, + block_number: int, ) -> None: """Function processes a single tx to find imbalances, fees, prices including writing to database.""" - self.log_message = [] try: - # compute raw token imbalances - token_imbalances = self.process_token_imbalances( - tx_hash, auction_id, block_number - ) + logger.info(f"Processing transaction {tx_hash}") + if self.process_prices: + self.process_tx_prices(tx_hash) - # get transaction timestamp - transaction_timestamp = self.blockchain_data.get_transaction_timestamp( - tx_hash - ) - # store transaction timestamp - self.db.write_transaction_timestamp(transaction_timestamp) - - # get transaction tokens - # transaction_tokens = self.blockchain_data.get_transaction_tokens(tx_hash) - # store transaction tokens - transaction_tokens = [] - for token_address, imbalance in token_imbalances.items(): - if imbalance != 0: - transaction_tokens.append((tx_hash, token_address)) - self.db.write_transaction_tokens(transaction_tokens) - - # update token decimals - update_token_decimals(self.db, self.blockchain_data) - - # get prices - prices_new = self.get_prices_for_tokens( - transaction_timestamp, transaction_tokens - ) - # store prices - self.db.write_prices_new(prices_new) - - # Compute Raw Token Imbalances - # if self.process_imbalances: - # token_imbalances = self.process_token_imbalances( - # tx_hash, auction_id, block_number - # ) - - # # Compute Fees - # if self.process_fees: - # ( - # protocol_fees, - # partner_fees, - # network_fees, - # ) = self.process_fees_for_transaction(tx_hash) - - # # Compute Prices - # if self.process_prices and self.process_imbalances: - # prices = self.process_prices_for_tokens( - # token_imbalances, block_number, tx_hash - # ) - - # Write to database iff no errors in either computations - # if ( - # (not self.process_imbalances) - # and (not self.process_fees) - # and (not self.process_prices) - # ): - # return - - if self.process_imbalances and token_imbalances: - self.handle_imbalances( - token_imbalances, tx_hash, auction_id, block_number - ) - - # if self.process_fees: - # self.handle_fees( - # protocol_fees, - # partner_fees, - # network_fees, - # auction_id, - # block_number, - # tx_hash, - # ) + if self.process_imbalances: + self.process_tx_imbalances(tx_hash, auction_id, block_number) - # if self.process_prices and prices: - # self.handle_prices(prices, tx_hash, block_number) - - logger.info("\n".join(self.log_message)) + if self.process_fees: + self.process_tx_fees(tx_hash, auction_id, block_number) except Exception as err: logger.error(f"An Error occurred: {err}") return - def process_token_imbalances( - self, tx_hash: str, auction_id: int, block_number: int - ) -> dict[str, int]: - """Process token imbalances for a given transaction and return imbalances.""" - try: - token_imbalances = self.imbalances.compute_imbalances(tx_hash) - if token_imbalances: - self.log_message.append( - f"Token Imbalances on {self.chain_name} for tx {tx_hash}:" - ) - return token_imbalances - except Exception as e: - logger.error(f"Failed to compute imbalances for transaction {tx_hash}: {e}") - return {} + def process_tx_prices(self, tx_hash: str) -> None: + # get transaction timestamp + transaction_timestamp = self.blockchain_data.get_transaction_timestamp(tx_hash) + # store transaction timestamp + self.db.write_transaction_timestamp(transaction_timestamp) - def process_fees_for_transaction( - self, - tx_hash: str, - ) -> tuple[ - dict[str, tuple[str, int]], - dict[str, tuple[str, int, str]], - dict[str, tuple[str, int]], - ]: - """Process and return protocol and network fees for a given transaction.""" - try: - protocol_fees, partner_fees, network_fees = compute_all_fees_of_batch( - HexBytes(tx_hash) - ) - return protocol_fees, partner_fees, network_fees - except Exception as e: - logger.error(f"Failed to process fees for transaction {tx_hash}: {e}") - return {}, {}, {} + # get transaction tokens + transaction_tokens = self.imbalances.get_transaction_tokens(tx_hash) + # store transaction tokens + self.db.write_transaction_tokens(transaction_tokens) + + # update token decimals + update_token_decimals(self.db, self.blockchain_data) + + # get prices + prices_new = self.get_prices_for_tokens( + transaction_timestamp, transaction_tokens + ) + # store prices + self.db.write_prices_new(prices_new) def get_prices_for_tokens( self, @@ -267,61 +192,40 @@ def get_prices_for_tokens( return prices - def process_prices_for_tokens( - self, - token_imbalances: dict[str, int], - block_number: int, - tx_hash: str, - ) -> dict[str, tuple[float, str]]: - """Compute prices for tokens with non-null imbalances.""" - prices = {} - try: - for token_address in token_imbalances.keys(): - price_data = self.price_providers.get_price( - set_params(token_address, block_number, tx_hash) - ) - if price_data: - price, source = price_data[0] - prices[token_address] = (price, source) - except Exception as e: - logger.error(f"Failed to process prices for transaction {tx_hash}: {e}") - - return prices + def process_tx_imbalances( + self, tx_hash: str, auction_id: int, block_number: int + ) -> None: + token_imbalances = self.imbalances.compute_token_imbalances(tx_hash) + if token_imbalances: + try: + for token_address, imbalance in token_imbalances.items(): + if imbalance != 0: + self.db.write_token_imbalances( + tx_hash, + auction_id, + block_number, + token_address, + imbalance, + ) + logger.info(f"Token: {token_address}, Imbalance: {imbalance}") + except Exception as err: + logger.error(f"Error: {err}") - def handle_imbalances( + def process_tx_fees( self, - token_imbalances: dict[str, int], tx_hash: str, auction_id: int, block_number: int, ) -> None: - """Function loops over non-null raw imbalances and writes them to the database.""" + """Process protocol, partner and network fees for a given transaction.""" try: - for token_address, imbalance in token_imbalances.items(): - if imbalance != 0: - self.db.write_token_imbalances( - tx_hash, - auction_id, - block_number, - token_address, - imbalance, - ) - self.log_message.append( - f"Token: {token_address}, Imbalance: {imbalance}" - ) - except Exception as err: - logger.error(f"Error: {err}") + protocol_fees, partner_fees, network_fees = compute_all_fees_of_batch( + HexBytes(tx_hash) + ) + except Exception as e: + logger.error(f"Failed to compute fees for transaction {tx_hash}: {e}") + return - def handle_fees( - self, - protocol_fees: dict[str, tuple[str, int]], - partner_fees: dict[str, tuple[str, int, str]], - network_fees: dict[str, tuple[str, int]], - auction_id: int, - block_number: int, - tx_hash: str, - ): - """This function loops over (token, fee) and calls write_fees to write to table.""" try: # Write protocol fees for order_uid, (token_address, fee_amount) in protocol_fees.items(): @@ -370,19 +274,6 @@ def handle_fees( f"Failed to write fees to database for transaction {tx_hash}: {err}" ) - def handle_prices( - self, prices: dict[str, tuple[float, str]], tx_hash: str, block_number: int - ) -> None: - """Function writes prices to table per token.""" - try: - for token_address, (price, source) in prices.items(): - self.db.write_prices( - source, block_number, tx_hash, token_address, price - ) - self.log_message.append(f"Token: {token_address}, Price: {price} ETH") - except Exception as err: - logger.error(f"Error: {err}") - def calculate_slippage( token_imbalances: dict[str, int], diff --git a/tests/e2e/test_blockchain_data.py b/tests/e2e/test_blockchain_data.py index 370b33d..ee7042b 100644 --- a/tests/e2e/test_blockchain_data.py +++ b/tests/e2e/test_blockchain_data.py @@ -3,6 +3,7 @@ from web3 import Web3 from src.helpers.blockchain_data import BlockchainData +from src.raw_imbalances import RawTokenImbalances def tests_get_tx_hashes_blocks(): @@ -25,22 +26,3 @@ def test_get_transaction_timestamp(): transaction_timestamp = blockchain.get_transaction_timestamp(tx_hash) assert transaction_timestamp == (tx_hash, 1728044411) - - -def test_get_transaction_tokens(): - web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL"))) - blockchain = BlockchainData(web3) - tx_hash = "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c" - - transaction_tokens = blockchain.get_transaction_tokens(tx_hash) - - assert all(h == tx_hash for h, _ in transaction_tokens) - assert set(token_address for _, token_address in transaction_tokens) == { - "0x7Fc66500c84A76Ad7e9c93437bFc5Ac33E2DDaE9", - "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", - "0x812Ba41e071C7b7fA4EBcFB62dF5F45f6fA853Ee", - "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", - "0xdAC17F958D2ee523a2206206994597C13D831ec7", - "0xEE2a03Aa6Dacf51C18679C516ad5283d8E7C2637", - "0x72e4f9F808C49A2a61dE9C5896298920Dc4EEEa9", - } diff --git a/tests/e2e/test_imbalances.py b/tests/e2e/test_imbalances.py new file mode 100644 index 0000000..a14250b --- /dev/null +++ b/tests/e2e/test_imbalances.py @@ -0,0 +1,40 @@ +from os import getenv + +from web3 import Web3 + +from src.raw_imbalances import RawTokenImbalances + + +def tests_process_single_transaction(): + web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL"))) + raw_imbalances = RawTokenImbalances(web3, "mainnet") + imbalances = raw_imbalances.compute_token_imbalances( + "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c" + ) + + assert imbalances == { + "0x72e4f9F808C49A2a61dE9C5896298920Dc4EEEa9": 3116463005, + "0x7Fc66500c84A76Ad7e9c93437bFc5Ac33E2DDaE9": 31552225710415395, + "0x812Ba41e071C7b7fA4EBcFB62dF5F45f6fA853Ee": 0, + "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48": 0, + "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": 0, + "0xEE2a03Aa6Dacf51C18679C516ad5283d8E7C2637": 275548164523, + } + + +def test_get_transaction_tokens(): + web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL"))) + imbalances = RawTokenImbalances(web3, "mainnet") + tx_hash = "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c" + + transaction_tokens = imbalances.get_transaction_tokens(tx_hash) + + assert all(h == tx_hash for h, _ in transaction_tokens) + assert set(token_address for _, token_address in transaction_tokens) == { + "0x7Fc66500c84A76Ad7e9c93437bFc5Ac33E2DDaE9", + "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", + "0x812Ba41e071C7b7fA4EBcFB62dF5F45f6fA853Ee", + "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", + "0xEE2a03Aa6Dacf51C18679C516ad5283d8E7C2637", + "0x72e4f9F808C49A2a61dE9C5896298920Dc4EEEa9", + } diff --git a/tests/e2e/test_imbalances_script.py b/tests/e2e/test_imbalances_script.py deleted file mode 100644 index ba0506d..0000000 --- a/tests/e2e/test_imbalances_script.py +++ /dev/null @@ -1,23 +0,0 @@ -from os import getenv - -from web3 import Web3 - -from src.imbalances_script import RawTokenImbalances - - -def tests_process_single_transaction(): - web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL"))) - raw_imbalances = RawTokenImbalances(web3, "mainnet") - imbalances = raw_imbalances.compute_imbalances( - "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c" - ) - - assert imbalances == { - "0x72e4f9F808C49A2a61dE9C5896298920Dc4EEEa9": 3116463005, - "0x7Fc66500c84A76Ad7e9c93437bFc5Ac33E2DDaE9": 31552225710415395, - "0x812Ba41e071C7b7fA4EBcFB62dF5F45f6fA853Ee": 0, - "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48": 0, - "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": 0, - "0xEE2a03Aa6Dacf51C18679C516ad5283d8E7C2637": 275548164523, - "0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE": 0, - } diff --git a/tests/e2e/test_transaction_processor.py b/tests/e2e/test_transaction_processor.py index 8c602ef..a4f51b8 100644 --- a/tests/e2e/test_transaction_processor.py +++ b/tests/e2e/test_transaction_processor.py @@ -9,16 +9,12 @@ def tests_process_single_transaction(): chain_name = "mainnet" - web3, db_engine = initialize_connections() - blockchain = BlockchainData(web3) - db = Database(db_engine, chain_name) - process_imbalances = True process_fees = False process_prices = True processor = TransactionProcessor( - blockchain, db, chain_name, process_imbalances, process_fees, process_prices + chain_name, process_imbalances, process_fees, process_prices ) # delete data diff --git a/tests/legacy/basic_test.py b/tests/legacy/basic_test.py index b01ca9e..99b0b04 100644 --- a/tests/legacy/basic_test.py +++ b/tests/legacy/basic_test.py @@ -2,7 +2,7 @@ import os from dotenv import load_dotenv import pytest -from src.imbalances_script import RawTokenImbalances +from src.raw_imbalances import RawTokenImbalances from src.helpers.helper_functions import get_web3_instance load_dotenv() @@ -54,6 +54,6 @@ def test_imbalances(tx_hash, expected_imbalances): """ chain_name = os.getenv("CHAIN_NAME") rt = RawTokenImbalances(get_web3_instance(), chain_name) - imbalances = rt.compute_imbalances(tx_hash) + imbalances = rt.compute_token_imbalances(tx_hash) for token_address, expected_imbalance in expected_imbalances.items(): assert imbalances.get(token_address) == expected_imbalance diff --git a/tests/legacy/compare_imbalances.py b/tests/legacy/compare_imbalances.py index 669e4bd..1238883 100644 --- a/tests/legacy/compare_imbalances.py +++ b/tests/legacy/compare_imbalances.py @@ -6,7 +6,7 @@ import time from web3 import Web3 from src.helpers.config import ETHEREUM_NODE_URL -from src.imbalances_script import RawTokenImbalances +from src.raw_imbalances import RawTokenImbalances from src.balanceof_imbalances import BalanceOfImbalances from src.daemon import get_web3_instance, create_db_connection, fetch_transaction_hashes