Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
__pycache__/
*.py[cod]
*$py.class
poetry.toml

.DS_Store
.envrc
.coverage

.env
.vscode/
.vscode/
.idea/
256 changes: 185 additions & 71 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ types-pyyaml = "^6.0.12"
types-pytz = "^2022.4.0.0"
python-dotenv = "^1.0.1"
numpy = "^2.1.3"
cffi = "^1.17"


[tool.poetry.group.dev.dependencies]
Expand Down
162 changes: 85 additions & 77 deletions pyth_observer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import os
import pprint
from typing import Any, Dict, List, Tuple

from base58 import b58decode
Expand All @@ -24,6 +25,7 @@
from pyth_observer.crosschain import CrosschainPriceObserver as Crosschain
from pyth_observer.dispatch import Dispatch
from pyth_observer.models import Publisher
import pyth_observer.health_server as health_server

PYTHTEST_HTTP_ENDPOINT = "https://api.pythtest.pyth.network/"
PYTHTEST_WS_ENDPOINT = "wss://api.pythtest.pyth.network/"
Expand Down Expand Up @@ -72,98 +74,104 @@ def __init__(

async def run(self):
while True:
logger.info("Running checks")

products = await self.get_pyth_products()
coingecko_prices, coingecko_updates = await self.get_coingecko_prices()
crosschain_prices = await self.get_crosschain_prices()

for product in products:
# Skip tombstone accounts with blank metadata
if "base" not in product.attrs:
continue

if not product.first_price_account_key:
continue

# For each product, we build a list of price feed states (one
# for each price account) and a list of publisher states (one
# for each publisher).
states = []
price_accounts = await self.get_pyth_prices(product)

crosschain_price = crosschain_prices.get(
b58decode(product.first_price_account_key.key).hex(), None
)

for _, price_account in price_accounts.items():
# Handle potential None for min_publishers
if (
price_account.min_publishers is None
# When min_publishers is high it means that the price is not production-ready
# yet and it is still being tested. We need no alerting for these prices.
or price_account.min_publishers >= 10
):
try:
logger.info("Running checks")

products = await self.get_pyth_products()
coingecko_prices, coingecko_updates = await self.get_coingecko_prices()
crosschain_prices = await self.get_crosschain_prices()

health_server.observer_ready = True

for product in products:
# Skip tombstone accounts with blank metadata
if "base" not in product.attrs:
continue

if not product.first_price_account_key:
continue

# Ensure latest_block_slot is not None or provide a default value
latest_block_slot = (
price_account.slot if price_account.slot is not None else -1
# For each product, we build a list of price feed states (one
# for each price account) and a list of publisher states (one
# for each publisher).
states = []
price_accounts = await self.get_pyth_prices(product)

crosschain_price = crosschain_prices.get(
b58decode(product.first_price_account_key.key).hex(), None
)

if not price_account.aggregate_price_status:
raise RuntimeError("Price account status is missing")

if not price_account.aggregate_price_info:
raise RuntimeError("Aggregate price info is missing")

states.append(
PriceFeedState(
symbol=product.attrs["symbol"],
asset_type=product.attrs["asset_type"],
schedule=MarketSchedule(product.attrs["schedule"]),
public_key=price_account.key,
status=price_account.aggregate_price_status,
# this is the solana block slot when price account was fetched
latest_block_slot=latest_block_slot,
latest_trading_slot=price_account.last_slot,
price_aggregate=price_account.aggregate_price_info.price,
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
coingecko_price=coingecko_prices.get(product.attrs["base"]),
coingecko_update=coingecko_updates.get(
product.attrs["base"]
),
crosschain_price=crosschain_price,
for _, price_account in price_accounts.items():
# Handle potential None for min_publishers
if (
price_account.min_publishers is None
# When min_publishers is high it means that the price is not production-ready
# yet and it is still being tested. We need no alerting for these prices.
or price_account.min_publishers >= 10
):
continue

# Ensure latest_block_slot is not None or provide a default value
latest_block_slot = (
price_account.slot if price_account.slot is not None else -1
)
)

for component in price_account.price_components:
pub = self.publishers.get(component.publisher_key.key, None)
publisher_name = (
(pub.name if pub else "")
+ f" ({component.publisher_key.key})"
).strip()
if not price_account.aggregate_price_status:
raise RuntimeError("Price account status is missing")

if not price_account.aggregate_price_info:
raise RuntimeError("Aggregate price info is missing")

states.append(
PublisherState(
publisher_name=publisher_name,
PriceFeedState(
symbol=product.attrs["symbol"],
asset_type=product.attrs["asset_type"],
schedule=MarketSchedule(product.attrs["schedule"]),
public_key=component.publisher_key,
confidence_interval=component.latest_price_info.confidence_interval,
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
price=component.latest_price_info.price,
price_aggregate=price_account.aggregate_price_info.price,
slot=component.latest_price_info.pub_slot,
aggregate_slot=price_account.last_slot,
public_key=price_account.key,
status=price_account.aggregate_price_status,
# this is the solana block slot when price account was fetched
latest_block_slot=latest_block_slot,
status=component.latest_price_info.price_status,
aggregate_status=price_account.aggregate_price_status,
latest_trading_slot=price_account.last_slot,
price_aggregate=price_account.aggregate_price_info.price,
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
coingecko_price=coingecko_prices.get(product.attrs["base"]),
coingecko_update=coingecko_updates.get(
product.attrs["base"]
),
crosschain_price=crosschain_price,
)
)

await self.dispatch.run(states)
for component in price_account.price_components:
pub = self.publishers.get(component.publisher_key.key, None)
publisher_name = (
(pub.name if pub else "")
+ f" ({component.publisher_key.key})"
).strip()
states.append(
PublisherState(
publisher_name=publisher_name,
symbol=product.attrs["symbol"],
asset_type=product.attrs["asset_type"],
schedule=MarketSchedule(product.attrs["schedule"]),
public_key=component.publisher_key,
confidence_interval=component.latest_price_info.confidence_interval,
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
price=component.latest_price_info.price,
price_aggregate=price_account.aggregate_price_info.price,
slot=component.latest_price_info.pub_slot,
aggregate_slot=price_account.last_slot,
# this is the solana block slot when price account was fetched
latest_block_slot=latest_block_slot,
status=component.latest_price_info.price_status,
aggregate_status=price_account.aggregate_price_status,
)
)

await self.dispatch.run(states)
except Exception as e:
logger.error(f"Error in run loop: {e}")
health_server.observer_ready = False

logger.debug("Sleeping...")
await asyncio.sleep(5)
Expand Down
11 changes: 10 additions & 1 deletion pyth_observer/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from pyth_observer import Observer, Publisher
from pyth_observer.models import ContactInfo
from pyth_observer.health_server import start_health_server


@click.command()
Expand Down Expand Up @@ -61,7 +62,15 @@ def run(config, publishers, coingecko_mapping, prometheus_port):

start_http_server(int(prometheus_port))

asyncio.run(observer.run())
async def main():
# Start health server in background
health_task = asyncio.create_task(start_health_server())
# Run observer
await observer.run()
# Optionally, wait for health server (should run forever)
await health_task

asyncio.run(main())


logger.remove()
Expand Down
26 changes: 26 additions & 0 deletions pyth_observer/health_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import asyncio
from aiohttp import web

observer_ready = False

async def live_handler(request):
return web.Response(text="OK")

async def ready_handler(request):
if observer_ready:
return web.Response(text="OK")
else:
return web.Response(status=503, text="Not Ready")

async def start_health_server(port=8080):
app = web.Application()
app.router.add_get("/live", live_handler)
app.router.add_get("/ready", ready_handler)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "0.0.0.0", port)
await site.start()
# Keep running forever
while True:
await asyncio.sleep(3600)

Loading