Skip to content

Commit c5ecb0f

Browse files
authored
feat(reliability) Add metrics (#91)
1 parent 8ab2e0e commit c5ecb0f

File tree

3 files changed

+437
-75
lines changed

3 files changed

+437
-75
lines changed

pyth_observer/__init__.py

Lines changed: 127 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from pyth_observer.crosschain import CrosschainPrice
2525
from pyth_observer.crosschain import CrosschainPriceObserver as Crosschain
2626
from pyth_observer.dispatch import Dispatch
27+
from pyth_observer.metrics import metrics
2728
from pyth_observer.models import Publisher
2829

2930
PYTHTEST_HTTP_ENDPOINT = "https://api.pythtest.pyth.network/"
@@ -71,7 +72,14 @@ def __init__(
7172
self.crosschain_throttler = Throttler(rate_limit=1, period=1)
7273
self.coingecko_mapping = coingecko_mapping
7374

75+
metrics.set_observer_info(
76+
network=config["network"]["name"],
77+
config=config,
78+
)
79+
7480
async def run(self):
81+
# global states
82+
states = []
7583
while True:
7684
try:
7785
logger.info("Running checks")
@@ -82,6 +90,9 @@ async def run(self):
8290

8391
health_server.observer_ready = True
8492

93+
processed_feeds = 0
94+
active_publishers_by_symbol = {}
95+
8596
for product in products:
8697
# Skip tombstone accounts with blank metadata
8798
if "base" not in product.attrs:
@@ -121,80 +132,136 @@ async def run(self):
121132
if not price_account.aggregate_price_info:
122133
raise RuntimeError("Aggregate price info is missing")
123134

124-
states.append(
125-
PriceFeedState(
126-
symbol=product.attrs["symbol"],
127-
asset_type=product.attrs["asset_type"],
128-
schedule=MarketSchedule(product.attrs["schedule"]),
129-
public_key=price_account.key,
130-
status=price_account.aggregate_price_status,
131-
# this is the solana block slot when price account was fetched
132-
latest_block_slot=latest_block_slot,
133-
latest_trading_slot=price_account.last_slot,
134-
price_aggregate=price_account.aggregate_price_info.price,
135-
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
136-
coingecko_price=coingecko_prices.get(
137-
product.attrs["base"]
138-
),
139-
coingecko_update=coingecko_updates.get(
140-
product.attrs["base"]
141-
),
142-
crosschain_price=crosschain_price,
143-
)
135+
price_feed_state = PriceFeedState(
136+
symbol=product.attrs["symbol"],
137+
asset_type=product.attrs["asset_type"],
138+
schedule=MarketSchedule(product.attrs["schedule"]),
139+
public_key=price_account.key,
140+
status=price_account.aggregate_price_status,
141+
# this is the solana block slot when price account was fetched
142+
latest_block_slot=latest_block_slot,
143+
latest_trading_slot=price_account.last_slot,
144+
price_aggregate=price_account.aggregate_price_info.price,
145+
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
146+
coingecko_price=coingecko_prices.get(product.attrs["base"]),
147+
coingecko_update=coingecko_updates.get(
148+
product.attrs["base"]
149+
),
150+
crosschain_price=crosschain_price,
144151
)
145152

153+
states.append(price_feed_state)
154+
processed_feeds += 1
155+
156+
metrics.update_price_feed_metrics(price_feed_state)
157+
158+
symbol = product.attrs["symbol"]
159+
if symbol not in active_publishers_by_symbol:
160+
active_publishers_by_symbol[symbol] = {
161+
"count": 0,
162+
"asset_type": product.attrs["asset_type"],
163+
}
164+
146165
for component in price_account.price_components:
147166
pub = self.publishers.get(component.publisher_key.key, None)
148167
publisher_name = (
149168
(pub.name if pub else "")
150169
+ f" ({component.publisher_key.key})"
151170
).strip()
152-
states.append(
153-
PublisherState(
154-
publisher_name=publisher_name,
155-
symbol=product.attrs["symbol"],
156-
asset_type=product.attrs["asset_type"],
157-
schedule=MarketSchedule(product.attrs["schedule"]),
158-
public_key=component.publisher_key,
159-
confidence_interval=component.latest_price_info.confidence_interval,
160-
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
161-
price=component.latest_price_info.price,
162-
price_aggregate=price_account.aggregate_price_info.price,
163-
slot=component.latest_price_info.pub_slot,
164-
aggregate_slot=price_account.last_slot,
165-
# this is the solana block slot when price account was fetched
166-
latest_block_slot=latest_block_slot,
167-
status=component.latest_price_info.price_status,
168-
aggregate_status=price_account.aggregate_price_status,
169-
)
171+
172+
publisher_state = PublisherState(
173+
publisher_name=publisher_name,
174+
symbol=product.attrs["symbol"],
175+
asset_type=product.attrs["asset_type"],
176+
schedule=MarketSchedule(product.attrs["schedule"]),
177+
public_key=component.publisher_key,
178+
confidence_interval=component.latest_price_info.confidence_interval,
179+
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
180+
price=component.latest_price_info.price,
181+
price_aggregate=price_account.aggregate_price_info.price,
182+
slot=component.latest_price_info.pub_slot,
183+
aggregate_slot=price_account.last_slot,
184+
# this is the solana block slot when price account was fetched
185+
latest_block_slot=latest_block_slot,
186+
status=component.latest_price_info.price_status,
187+
aggregate_status=price_account.aggregate_price_status,
170188
)
171189

172-
await self.dispatch.run(states)
190+
states.append(publisher_state)
191+
active_publishers_by_symbol[symbol]["count"] += 1
192+
193+
metrics.price_feeds_processed.set(processed_feeds)
194+
195+
for symbol, info in active_publishers_by_symbol.items():
196+
metrics.publishers_active.labels(
197+
symbol=symbol, asset_type=info["asset_type"]
198+
).set(info["count"])
199+
200+
await self.dispatch.run(states)
201+
173202
except Exception as e:
174203
logger.error(f"Error in run loop: {e}")
175204
health_server.observer_ready = False
176-
177-
logger.debug("Sleeping...")
205+
metrics.loop_errors_total.labels(error_type=type(e).__name__).inc()
178206
await asyncio.sleep(5)
179207

180208
async def get_pyth_products(self) -> List[PythProductAccount]:
181209
logger.debug("Fetching Pyth product accounts...")
182210

183-
async with self.pyth_throttler:
184-
return await self.pyth_client.refresh_products()
211+
try:
212+
async with self.pyth_throttler:
213+
with metrics.time_operation(
214+
metrics.api_request_duration, service="pyth", endpoint="products"
215+
):
216+
result = await self.pyth_client.refresh_products()
217+
metrics.api_request_total.labels(
218+
service="pyth", endpoint="products", status="success"
219+
).inc()
220+
return result
221+
except Exception:
222+
metrics.api_request_total.labels(
223+
service="pyth", endpoint="products", status="error"
224+
).inc()
225+
raise
185226

186227
async def get_pyth_prices(
187228
self, product: PythProductAccount
188229
) -> Dict[PythPriceType, PythPriceAccount]:
189230
logger.debug("Fetching Pyth price accounts...")
190231

191-
async with self.pyth_throttler:
192-
return await product.refresh_prices()
232+
try:
233+
async with self.pyth_throttler:
234+
with metrics.time_operation(
235+
metrics.api_request_duration, service="pyth", endpoint="prices"
236+
):
237+
result = await product.refresh_prices()
238+
metrics.api_request_total.labels(
239+
service="pyth", endpoint="prices", status="success"
240+
).inc()
241+
return result
242+
except Exception:
243+
metrics.api_request_total.labels(
244+
service="pyth", endpoint="prices", status="error"
245+
).inc()
246+
raise
193247

194248
async def get_coingecko_prices(self):
195249
logger.debug("Fetching CoinGecko prices...")
196250

197-
data = await get_coingecko_prices(self.coingecko_mapping)
251+
try:
252+
with metrics.time_operation(
253+
metrics.api_request_duration, service="coingecko", endpoint="prices"
254+
):
255+
data = await get_coingecko_prices(self.coingecko_mapping)
256+
metrics.api_request_total.labels(
257+
service="coingecko", endpoint="prices", status="success"
258+
).inc()
259+
except Exception:
260+
metrics.api_request_total.labels(
261+
service="coingecko", endpoint="prices", status="error"
262+
).inc()
263+
raise
264+
198265
prices: Dict[str, float] = {}
199266
updates: Dict[str, int] = {} # Unix timestamps
200267

@@ -205,4 +272,17 @@ async def get_coingecko_prices(self):
205272
return (prices, updates)
206273

207274
async def get_crosschain_prices(self) -> Dict[str, CrosschainPrice]:
208-
return await self.crosschain.get_crosschain_prices()
275+
try:
276+
with metrics.time_operation(
277+
metrics.api_request_duration, service="crosschain", endpoint="prices"
278+
):
279+
result = await self.crosschain.get_crosschain_prices()
280+
metrics.api_request_total.labels(
281+
service="crosschain", endpoint="prices", status="success"
282+
).inc()
283+
return result
284+
except Exception:
285+
metrics.api_request_total.labels(
286+
service="crosschain", endpoint="prices", status="error"
287+
).inc()
288+
raise

pyth_observer/dispatch.py

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from typing import Any, Awaitable, Dict, List
77

88
from loguru import logger
9-
from prometheus_client import Gauge
109

1110
from pyth_observer.check import Check, State
1211
from pyth_observer.check.price_feed import PRICE_FEED_CHECKS, PriceFeedState
@@ -15,6 +14,7 @@
1514
from pyth_observer.event import LogEvent # Used dynamically
1615
from pyth_observer.event import TelegramEvent # Used dynamically
1716
from pyth_observer.event import Context, Event, ZendutyEvent
17+
from pyth_observer.metrics import metrics
1818
from pyth_observer.zenduty import send_zenduty_alert
1919

2020
assert DatadogEvent
@@ -32,16 +32,6 @@ class Dispatch:
3232
def __init__(self, config, publishers):
3333
self.config = config
3434
self.publishers = publishers
35-
self.price_feed_check_gauge = Gauge(
36-
"price_feed_check_failed",
37-
"Price feed check failure status",
38-
["check", "symbol"],
39-
)
40-
self.publisher_check_gauge = Gauge(
41-
"publisher_check_failed",
42-
"Publisher check failure status",
43-
["check", "symbol", "publisher"],
44-
)
4535
if "ZendutyEvent" in self.config["events"]:
4636
self.open_alerts_file = os.environ["OPEN_ALERTS_FILE"]
4737
self.open_alerts = self.load_alerts()
@@ -98,48 +88,70 @@ async def run(self, states: List[State]):
9888
sent_events.append(event.send())
9989

10090
await asyncio.gather(*sent_events)
91+
92+
metrics.update_alert_metrics(self.open_alerts)
93+
10194
if "ZendutyEvent" in self.config["events"]:
10295
await self.process_zenduty_events(current_time)
10396

10497
def check_price_feed(self, state: PriceFeedState) -> List[Check]:
10598
failed_checks: List[Check] = []
99+
total_checks = 0
100+
passed_checks = 0
106101

107102
for check_class in PRICE_FEED_CHECKS:
108103
config = self.load_config(check_class.__name__, state.symbol)
109-
check = check_class(state, config)
110-
gauge = self.price_feed_check_gauge.labels(
111-
check=check_class.__name__,
112-
symbol=state.symbol,
113-
)
114104

115105
if config["enable"]:
116-
if check.run():
117-
gauge.set(0)
106+
total_checks += 1
107+
check = check_class(state, config)
108+
109+
with metrics.time_operation(
110+
metrics.check_execution_duration, check_type=check_class.__name__
111+
):
112+
check_passed = check.run()
113+
114+
if check_passed:
115+
passed_checks += 1
118116
else:
119117
failed_checks.append(check)
120-
gauge.set(1)
118+
119+
if total_checks > 0:
120+
success_rate = passed_checks / total_checks
121+
metrics.check_success_rate.labels(
122+
check_type="price_feed", symbol=state.symbol
123+
).set(success_rate)
121124

122125
return failed_checks
123126

124127
def check_publisher(self, state: PublisherState) -> List[Check]:
125128
failed_checks: List[Check] = []
129+
total_checks = 0
130+
passed_checks = 0
126131

127132
for check_class in PUBLISHER_CHECKS:
128133
config = self.load_config(check_class.__name__, state.symbol)
129-
check = check_class(state, config)
130-
gauge = self.publisher_check_gauge.labels(
131-
check=check_class.__name__,
132-
symbol=state.symbol,
133-
publisher=self.publishers.get(state.public_key, state.public_key),
134-
)
135134

136135
if config["enable"]:
137-
if check.run():
138-
gauge.set(0)
136+
total_checks += 1
137+
check = check_class(state, config)
138+
139+
with metrics.time_operation(
140+
metrics.check_execution_duration, check_type=check_class.__name__
141+
):
142+
check_passed = check.run()
143+
144+
if check_passed:
145+
passed_checks += 1
139146
else:
140-
gauge.set(1)
141147
failed_checks.append(check)
142148

149+
if total_checks > 0:
150+
success_rate = passed_checks / total_checks
151+
metrics.check_success_rate.labels(
152+
check_type="publisher", symbol=state.symbol
153+
).set(success_rate)
154+
143155
return failed_checks
144156

145157
def load_config(self, check_name: str, symbol: str) -> Dict[str, Any]:
@@ -187,12 +199,16 @@ async def process_zenduty_events(self, current_time):
187199
):
188200
logger.debug(f"Resolving Zenduty alert {identifier}")
189201
resolved = True
202+
190203
if info["sent"]:
191204
response = await send_zenduty_alert(
192205
identifier, identifier, resolved=True
193206
)
194207
if response and 200 <= response.status < 300:
195208
to_remove.append(identifier)
209+
metrics.alerts_sent_total.labels(
210+
alert_type=info["type"], channel="zenduty"
211+
).inc()
196212
else:
197213
to_remove.append(identifier)
198214
# Raise alert if failed > $threshold times within the last 5m window
@@ -216,6 +232,10 @@ async def process_zenduty_events(self, current_time):
216232
event = self.delayed_events.get(key)
217233
if event:
218234
to_alert.append(event.send())
235+
metrics.alerts_sent_total.labels(
236+
alert_type=info["type"],
237+
channel=event_type.lower().replace("event", ""),
238+
).inc()
219239

220240
# Send the alerts that were delayed due to thresholds
221241
await asyncio.gather(*to_alert)
@@ -229,5 +249,7 @@ async def process_zenduty_events(self, current_time):
229249
if self.delayed_events.get(key):
230250
del self.delayed_events[key]
231251

252+
metrics.update_alert_metrics(self.open_alerts)
253+
232254
with open(self.open_alerts_file, "w") as file:
233255
json.dump(self.open_alerts, file)

0 commit comments

Comments
 (0)