Skip to content
25 changes: 6 additions & 19 deletions src/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,18 @@
def main() -> None:
# valid chain names: mainnet, xdai, arbitrum_one
chain_name = os.getenv("CHAIN_NAME")
process_imbalances = os.getenv("PROCESS_IMBALANCES", "True") == "True"
process_prices = os.getenv("PROCESS_PRICES", "False") == "True"
process_fees = os.getenv("PROCESS_FEES", "False") == "True"

Comment on lines +11 to +14
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using config variables here sounds good.

if chain_name is None:
logger.error("CHAIN_NAME environment variable is not set.")
return

process_imbalances = True
process_fees = False
process_prices = True

web3, db_engine = initialize_connections()
blockchain = BlockchainData(web3)
db = Database(db_engine, chain_name)

if chain_name == "arbitrum_one":
process_imbalances = False
process_prices = False

if chain_name == "xdai":
process_prices = False

processor = TransactionProcessor(
blockchain, db, chain_name, process_imbalances, process_fees, process_prices
chain_name, process_imbalances, process_fees, process_prices
)

start_block = processor.get_start_block()
processor.process(start_block)
processor.run()


if __name__ == "__main__":
Expand Down
52 changes: 37 additions & 15 deletions src/imbalances_script.py → src/raw_imbalances.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,28 @@ def extract_actions(self, traces: list[AttributeDict], address: str) -> list[dic
actions.append(dict(action))
return actions

def calculate_native_eth_imbalance(self, actions: list[dict], address: str) -> int:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change looks like a regression in terms of complexity and code style.

What was the rational?

If the change of return type is required, make only that change here, e.g. via if in_flow == 0 and out_flow == 0: return None.

The docstring should mention what it means if None is returned.

def calculate_native_eth_imbalance(
self, actions: list[dict], address: str
) -> int | None:
"""Extract ETH imbalance from transfer actions."""
# inflow is the total value transferred to address param
inflow = sum(
_to_int(action["value"])
for action in actions
if Web3.to_checksum_address(action.get("to", "")) == address
)
# outflow is the total value transferred out of address param
outflow = sum(
_to_int(action["value"])
for action in actions
if Web3.to_checksum_address(action.get("from", "")) == address
)
return inflow - outflow
native_eth_imbalance = 0
is_it_none = True
for action in actions:
flow = 0
if Web3.to_checksum_address(action.get("to", "")) == address:
flow = _to_int(action["value"])
else:
if Web3.to_checksum_address(action.get("from", "")) == address:
flow = (-1) * _to_int(action["value"])
if flow != 0:
is_it_none = False
native_eth_imbalance += flow

if is_it_none:
return None
else:
return native_eth_imbalance

def extract_events(self, tx_receipt: dict) -> dict[str, list[dict]]:
"""Extract relevant events from the transaction receipt."""
Expand Down Expand Up @@ -297,7 +304,7 @@ def filter_sdai_events(event_list: list[dict], is_deposit: bool) -> None:
filter_sdai_events(events["DepositSDAI"], is_deposit=True)
filter_sdai_events(events["WithdrawSDAI"], is_deposit=False)

def compute_imbalances(self, tx_hash: str) -> dict[str, int]:
def aggregate_imbalances(self, tx_hash: str) -> dict[str, int]:
try:
tx_receipt = self.get_transaction_receipt(tx_hash)
if not tx_receipt:
Expand Down Expand Up @@ -332,14 +339,29 @@ def compute_imbalances(self, tx_hash: str) -> dict[str, int]:
logger.error("Error computing imbalances for %s: %s", tx_hash, e)
raise

def compute_token_imbalances(self, tx_hash: str) -> dict[str, int]:
"""Process token imbalances for a given transaction and return imbalances."""
try:
return self.aggregate_imbalances(tx_hash)
except Exception as e:
logger.error(f"Failed to compute imbalances for transaction {tx_hash}: {e}")
return {}

def get_transaction_tokens(self, tx_hash: str) -> list[tuple[str, str]]:
token_imbalances = self.compute_token_imbalances(tx_hash)
transaction_tokens = []
for token_address in token_imbalances.keys():
transaction_tokens.append((tx_hash, token_address))
return transaction_tokens


def main() -> None:
"""main function for finding imbalance for a single tx hash."""
tx_hash = input("Enter transaction hash: ")
chain_name, web3 = find_chain_with_tx(tx_hash)
rt = RawTokenImbalances(web3, chain_name)
try:
imbalances = rt.compute_imbalances(tx_hash)
imbalances = rt.aggregate_imbalances(tx_hash)
if imbalances:
logger.info(f"Token Imbalances on {chain_name}:")
for token_address, imbalance in imbalances.items():
Expand Down
4 changes: 2 additions & 2 deletions src/test_single_hash.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from hexbytes import HexBytes
from web3 import Web3
from src.imbalances_script import RawTokenImbalances
from src.raw_imbalances import RawTokenImbalances
from src.price_providers.price_feed import PriceFeed
from src.fees.compute_fees import compute_all_fees_of_batch
from src.transaction_processor import calculate_slippage
Expand All @@ -25,7 +25,7 @@ def __init__(self):
self.price_providers = PriceFeed()

def compute_data(self, tx_hash: str):
token_imbalances = self.imbalances.compute_imbalances(tx_hash)
token_imbalances = self.imbalances.compute_token_imbalances(tx_hash)
protocol_fees, partner_fees, network_fees = compute_all_fees_of_batch(
HexBytes(tx_hash)
)
Expand Down
Loading