-
Notifications
You must be signed in to change notification settings - Fork 55
Expand file tree
/
Copy pathPBData.py
More file actions
4163 lines (3927 loc) · 218 KB
/
PBData.py
File metadata and controls
4163 lines (3927 loc) · 218 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import ast
import psutil
import subprocess
import sys
import os
from contextlib import asynccontextmanager
from pathlib import Path, PurePath
from time import sleep
from datetime import datetime
import platform
import traceback
from pbgui_func import PBGDIR
import json
import re
from pathlib import Path as _Path
import tempfile
from Database import Database
from User import Users
import configparser
from collections import defaultdict
import asyncio
import random
from logging_helpers import human_log as _human_log, set_service_min_level, is_debug_enabled
from Exchange import set_ws_limits, Exchange as _Exchange
from market_data import load_market_data_config, get_daily_hour_coverage_for_dataset, set_enabled_coins
from rate_limit_budget import RateLimitBudget, EXCHANGE_RATE_LIMITS, get_weight
from hyperliquid_best_1m import update_latest_hyperliquid_1m_api_for_coin
from binance_best_1m import update_latest_binance_1m_for_coin
from inventory_cache import refresh_coin as _refresh_inventory_coin
async def _wait_for_flag(flag_path: _Path, timeout: float) -> bool:
"""Wait until flag_path is created (or timeout expires).
Uses Linux inotify so the coroutine sleeps with zero CPU usage and wakes
up immediately when the file appears. Falls back to 5-second polling if
inotify is unavailable (non-Linux or permission error).
Returns True if the flag was detected, False on timeout.
"""
# If flag already exists (e.g. clicked during a run), consume and return immediately.
if flag_path.exists():
try:
flag_path.unlink(missing_ok=True)
except Exception:
pass
return True
import struct as _struct
import ctypes as _ct
try:
_libc = _ct.CDLL(None, use_errno=True)
# inotify_init1(IN_NONBLOCK)
_ifd = _libc.inotify_init1(0o4000)
if _ifd < 0:
raise OSError("inotify_init1 failed")
# Watch the parent directory for file creation / atomic rename
_wd = _libc.inotify_add_watch(
_ifd,
str(flag_path.parent).encode(),
0x100 | 0x80 | 0x08, # IN_CREATE | IN_MOVED_TO | IN_CLOSE_WRITE
)
if _wd < 0:
os.close(_ifd)
raise OSError("inotify_add_watch failed")
# Check again after setting up the watch (race-free)
if flag_path.exists():
try:
_libc.inotify_rm_watch(_ifd, _wd)
os.close(_ifd)
except Exception:
pass
try:
flag_path.unlink(missing_ok=True)
except Exception:
pass
return True
loop = asyncio.get_running_loop()
fut: asyncio.Future = loop.create_future()
_fname = flag_path.name.encode()
def _readable():
try:
data = os.read(_ifd, 4096)
offset = 0
while offset + 16 <= len(data):
_wd2, _mask, _cookie, nlen = _struct.unpack_from('iIII', data, offset)
name = data[offset + 16: offset + 16 + nlen].rstrip(b'\x00')
offset += 16 + nlen
if name == _fname and not fut.done():
fut.set_result(True)
return
except Exception:
pass
loop.add_reader(_ifd, _readable)
try:
return await asyncio.wait_for(asyncio.shield(fut), timeout=timeout)
except asyncio.TimeoutError:
return False
finally:
loop.remove_reader(_ifd)
try:
_libc.inotify_rm_watch(_ifd, _wd)
os.close(_ifd)
except Exception:
pass
except Exception:
# Fallback: 5-second polling (non-Linux or inotify unavailable)
elapsed = 0.0
while elapsed < timeout:
if flag_path.exists():
try:
flag_path.unlink(missing_ok=True)
except Exception:
pass
return True
await asyncio.sleep(5.0)
elapsed += 5.0
return False
async def _notify_api_balance():
"""Fire-and-forget: POST to FastAPI to fan-out balance_updated to all /ws/dashboard clients.
Called via asyncio.create_task() after each successful update_balances() write.
Errors are silently swallowed — a missed notification is not critical.
"""
import urllib.request
from pbgui_purefunc import load_ini
try:
port_val = load_ini("api_server", "port")
port = int(port_val) if port_val and str(port_val).isdigit() else 8000
req = urllib.request.Request(
f"http://127.0.0.1:{port}/api/internal/notify/balance",
data=b"",
method="POST",
)
await asyncio.to_thread(urllib.request.urlopen, req, None, 2)
except Exception as e:
try:
_human_log('PBData', f"[notify] balance notification to API failed: {e}", level='DEBUG')
except Exception:
pass
async def _notify_api_positions(user_name: str = ""):
"""Fire-and-forget: POST to FastAPI so chart subscribers get refreshed entry lines.
Called via asyncio.create_task() after each successful update_positions() write.
The API server reads the updated DB row and pushes it to all chart WS clients
watching that user — this ensures the Orders widget clears stale Entry lines even
when the WS missed the original 'position closed' event.
Errors are silently swallowed — a missed notification is not critical.
"""
import urllib.request
from pbgui_purefunc import load_ini
try:
port_val = load_ini("api_server", "port")
port = int(port_val) if port_val and str(port_val).isdigit() else 8000
body = json.dumps({"user": user_name}).encode()
req = urllib.request.Request(
f"http://127.0.0.1:{port}/api/internal/notify/positions",
data=body,
method="POST",
headers={"Content-Type": "application/json"},
)
await asyncio.to_thread(urllib.request.urlopen, req, None, 2)
except Exception as e:
try:
_human_log('PBData', f"[notify] positions notification to API failed: {e}", level='DEBUG')
except Exception:
pass
async def _notify_api_income(user_name: str = ""):
"""Fire-and-forget: POST to FastAPI to fan-out income_updated to all /ws/dashboard clients.
Called via asyncio.create_task() after each successful update_history() write.
user_name is forwarded so clients can filter and only reload for their configured user(s).
Errors are silently swallowed — a missed notification is not critical.
"""
import urllib.request
from pbgui_purefunc import load_ini
try:
port_val = load_ini("api_server", "port")
port = int(port_val) if port_val and str(port_val).isdigit() else 8000
body = json.dumps({"user": user_name}).encode()
req = urllib.request.Request(
f"http://127.0.0.1:{port}/api/internal/notify/income",
data=body,
method="POST",
headers={"Content-Type": "application/json"},
)
await asyncio.to_thread(urllib.request.urlopen, req, None, 2)
except Exception as e:
try:
_human_log('PBData', f"[notify] income notification to API failed: {e}", level='DEBUG')
except Exception:
pass
class PBData():
def __init__(self):
self.piddir = Path(f'{PBGDIR}/data/pid')
if not self.piddir.exists():
self.piddir.mkdir(parents=True)
self.pidfile = Path(f'{self.piddir}/pbdata.pid')
self.my_pid = None
self.db = Database()
self.users = Users()
self._fetch_users = []
self._trades_users = []
self.load_fetch_users()
self.load_trades_users()
self._price_exchange_tasks = {}
self._price_exchange_config = {}
# Track which symbols we have already subscribed to per exchange
self._price_subscribed_symbols = {}
# In-memory buffer for latest prices per (user, symbol).
# Key: (user_name, symbol) -> (timestamp, price)
self._price_buffer = {}
# Async lock protecting _price_buffer
self._price_buffer_lock = asyncio.Lock()
# Flush interval in seconds for buffered price writes
self._price_flush_interval = 10.0
# Background writer task handle
self._price_writer_task = None
# Enable/disable price buffering via env var `PB_PRICE_BUFFER` (default: enabled)
self._price_buffer_enabled = True
# (IO tracking removed) -- process/db IO debugging variables removed
self._history_rest_last = {}
# Log level used when a REST slot is busy and an update is skipped.
# Default to 'DEBUG' to avoid spamming the logs; set to 'INFO' when
# you want to surface these events without treating them as warnings.
self._rest_slot_busy_log_level = 'DEBUG'
# Log level used when the debounce flusher gives up after retries.
# These events are informational in normal operation; set to 'DEBUG'
# so they don't fill logs during normal runs.
self._debounce_giveup_log_level = 'DEBUG'
self._last_fetch_users_snapshot = set()
self._last_exchange_queue_counts = {}
self._last_queue_log_ts = 0.0
self._queue_log_every_secs = 60.0
self._last_loop_log_ts = 0.0
self._loop_log_every_secs = 60.0
self._last_mapping_log_ts_by_exchange = {}
self._price_ticks_count = {}
# network error log throttle map: (exchange,user) -> ts
self._last_network_error_log_ts = {}
self._network_error_log_throttle = 30.0
# Max number of symbols to subscribe in one watch_tickers call
self._price_subscribe_chunk_size = 20
# Per-exchange overrides for subscribe chunk sizes (symbols per watch_tickers call)
self._price_subscribe_chunk_size_by_exchange = {
'hyperliquid': 5,
'bitget': 5,
'binance': 5,
# bybit can be sensitive to large batch subscribes — use smaller chunks
'bybit': 10,
}
# Stagger (ms) between starting private ws watchers to avoid bursts
self._private_ws_stagger_ms = 200
# Pause (s) between per-user REST calls in shared pollers to avoid bursts
# Default small pause to reduce rate-limit triggers; can be overridden per-exchange
# Tuned defaults to reduce observed 429s; adjust via pbgui.ini later if needed
self._shared_rest_user_pause = 0.75
self._shared_rest_pause_by_exchange = {
# Increase hyperliquid pause to reduce 429s observed under load
'hyperliquid': 3.0,
# Increased to reduce REST fallback bursts that triggered 429s
# (mitigation A): raised from 1.0 -> 3.0 seconds
'bybit': 3.0,
}
# Per-exchange semaphore limits for REST slot gating are defined
# near _rest_semaphores (see _rest_semaphore_limits_by_exchange).
# Per-exchange limit for how many distinct users the price watcher may track
# Some exchanges (hyperliquid) enforce a hard cap on tracked users for websocket topics.
self._price_subscribe_user_limit_by_exchange = {
'hyperliquid': 10,
}
self._mapping_rebuild_min_interval = 300.0 # seconds per exchange
self._pollers_delay_seconds = 60.0
self._pollers_enabled_after_ts = datetime.now().timestamp() + self._pollers_delay_seconds
# Shared poller intervals (seconds)
self._shared_combined_interval_seconds = 90
self._shared_history_interval_seconds = 300
self._shared_executions_interval_seconds = 1800
# Balance has its own (longer) cadence — purely informational, no rush.
self._poll_interval_balance_seconds = 300
# Positions cadence — only changes after fills/liquidations.
self._poll_interval_positions_seconds = 300
# Orders cadence — position-per-symbol calls are expensive on HL; 60 s is still fresh enough.
self._poll_interval_orders_seconds = 60
# Users whose balance/positions/orders should be refreshed on the next combined cycle
# (set by history/executions pollers when new entries are detected).
self._balance_stale_users: set = set()
self._positions_stale_users: set = set()
self._orders_stale_users: set = set()
# Flag to restart shared poller tasks when intervals change
self._poll_intervals_changed = False
self._pbgui_ini_mtime = None
# Last loaded ws_max value from pbgui.ini (so we only reapply when changed)
self._ws_max_loaded = None
# Last loaded log_level for PBData (string like 'DEBUG'/'INFO')
self._log_level_loaded = None
# Snapshot of last trimmed allowed users per exchange to avoid repeated logs
self._price_subscribe_trim_snapshot = {}
# Track recent network-demoted users per exchange to avoid mass demotion
self._exchange_network_error_users = defaultdict(dict) # exchange -> {user_name: timestamp}
self._network_error_locks = {}
# Time window (seconds) during which only one demotion is allowed per exchange
self._network_demotion_window = 60
# Per-exchange backoff state and error tracking
self._exchange_backoff_until = {} # exchange -> timestamp until which we should backoff
self._exchange_history_backoff_until = {} # history-only backoff (long_history_poll) – does NOT block combined poller
self._exchange_error_timestamps = defaultdict(list) # exchange -> [ts1, ts2, ...]
self._error_window_seconds = 30
self._error_threshold = 6
self._backoff_duration_seconds = 60
# Timeout (seconds) used for asyncio.wait_for around ccxt.pro watch_* calls
# Increase on slow VPS if you see ping-pong RequestTimeouts.
self._price_watch_timeout = 120
# Websocket restart-once state: track one restart per (exchange,user)
# and consecutive successful watch messages to clear the restart marker.
self._ws_restarted_once = set() # set of (exchange, user_name)
self._ws_success_counts = defaultdict(int) # (exchange, user_name) -> consecutive successes
self._ws_success_required = 3 # successes required to clear restart marker
# Base sleep (s) before re-creating client. Increased to reduce
# synchronized reconnect storms when many watchers detect keepalive
# timeouts simultaneously.
self._ws_restart_sleep = 1.5 # base sleep (s) before re-creating client
# Per-exchange semaphore limiting concurrent reconnect attempts to 2.
# When a Bybit/Hyperliquid server event drops all N connections at once,
# without throttling all N tasks race to reconnect simultaneously, overwhelming
# the asyncio event loop and delaying ping-pong for other exchanges.
# Semaphore(2) lets at most 2 reconnects proceed at the same time; the rest
# queue up naturally, spreading the reconnect storm over several seconds.
self._exchange_reconnect_sem: dict = {} # exchange -> asyncio.Semaphore(2)
# Per-user last fetch timestamps (key: (user_name, kind) -> epoch seconds)
# kind in {'balances','positions','orders','history'}
self._last_fetch_ts = defaultdict(dict)
# If a shared history poll takes longer than this (s) consider exchange overloaded
# Increase threshold to avoid overly-aggressive backoffs for slower history endpoints
self._long_poll_threshold_seconds = 60
# Metrics task handle
self._metrics_task = None
# Metrics sampling interval (seconds), configurable via env PB_METRICS_INTERVAL
self._metrics_interval = 10
# ── Poller metrics (observable via API) ──────────────────
# Per-exchange counters / timestamps that the GUI can display.
# Written to data/logs/poller_metrics.json every _metrics_interval.
self._poller_metrics = defaultdict(lambda: {
'combined_last_ts': 0, # epoch of last combined-poller cycle finish
'combined_cycle_ms': 0, # duration of last combined cycle (ms)
'combined_users': 0, # users polled in last combined cycle
'history_last_ts': 0, # epoch of last history-poller cycle finish
'history_cycle_ms': 0, # duration of last history cycle (ms)
'history_users': 0, # users polled in last history cycle
'market_data_last_ts': 0, # epoch of last market-data cycle finish
'market_data_coins': 0, # coins processed in last market-data cycle
'rate_limit_429': 0, # cumulative 429 count since startup
'errors': 0, # cumulative error count since startup
'rest_slot_timeouts': 0, # cumulative REST slot timeout count
})
# ── Rate-limit budgets (token bucket per exchange) ───────
self._rate_budgets: dict[str, RateLimitBudget] = {}
# Per-exchange lock: ensures only ONE poller (combined/history/executions)
# draws from the budget at a time, preventing concurrent token starvation.
self._budget_poller_locks: dict[str, asyncio.Lock] = {}
for _exch_name, _cfg in EXCHANGE_RATE_LIMITS.items():
self._rate_budgets[_exch_name] = RateLimitBudget(**_cfg)
self._budget_poller_locks[_exch_name] = asyncio.Lock()
# (IO debugging disabled) -- per-metrics-cycle DB/process IO logging removed
# Latest 1m candles (API) auto-refresh settings
self._latest_1m_enabled = True
self._latest_1m_interval_seconds = 1800
self._latest_1m_coin_pause_seconds = 2.0
self._latest_1m_api_timeout_seconds = 30.0
self._latest_1m_min_lookback_days = 2
self._latest_1m_max_lookback_days = 4
self._latest_1m_gap_stale_minutes = 15
self._latest_1m_hist_interval_seconds = 1800
self._latest_1m_last_hist_scan_ts = 0.0
self._latest_1m_task = None
# Binance latest 1m auto-refresh settings
self._binance_latest_1m_enabled = True
self._binance_latest_1m_interval_seconds = 3600
self._binance_latest_1m_coin_pause_seconds = 0.5
self._binance_latest_1m_api_timeout_seconds = 30.0
self._binance_latest_1m_min_lookback_days = 2
self._binance_latest_1m_max_lookback_days = 7
self._binance_latest_1m_task = None
# Bybit latest 1m auto-refresh settings
self._bybit_latest_1m_enabled = True
self._bybit_latest_1m_interval_seconds = 3600
self._bybit_latest_1m_coin_pause_seconds = 0.5
self._bybit_latest_1m_api_timeout_seconds = 30.0
self._bybit_latest_1m_min_lookback_days = 2
self._bybit_latest_1m_max_lookback_days = 7
self._bybit_latest_1m_task = None
self._market_data_status_path = _Path(f"{PBGDIR}/data/logs/market_data_status.json")
self._market_data_status_lock = asyncio.Lock()
# Load initial settings (ws_max, log_level, ...)
try:
self._load_settings()
except Exception:
pass
# Caller-side private-client manager (Queue + background manager task)
# This manager serializes private-client creation requests from PBData
# so that check+reserve logic can be performed atomically on the caller
# side and duplicate cap-warning log spam is avoided.
self._private_client_queue = None
self._private_client_manager_task = None
# manager_state contains transient reservation set and warned flags
self._private_client_manager_state = {
'inflight': set(), # set of keys currently reserved by manager
'warned_global': False, # whether a global-cap warning was emitted
'warned_per_exch': {}, # exchange -> bool
}
# REST-fallback throttling: per-exchange semaphores to limit concurrent
# REST calls issued as fallbacks when WS handlers cannot persist data.
self._rest_semaphores = {}
# Tunable per-exchange concurrent REST slots (lower for sensitive exchanges)
self._rest_semaphore_limits_by_exchange = {
'bybit': 1,
'hyperliquid': 2,
}
self._default_rest_semaphore_limit = 3
# How long (s) to wait to acquire a REST slot before skipping the update
self._rest_semaphore_acquire_timeout = 5.0
# Debounce settings for WS-driven DB writes (seconds)
self._debounce_interval_by_kind = {
'balances': 2.0,
'positions': 2.0,
'orders': 5.0,
}
# Maximum time we'll retry gated REST writes before giving up (seconds)
self._debounce_max_retry_seconds = 30.0
# In-memory debounce buffers: { kind: { user_name: { 'user': User, 'first_ts': float, 'task': asyncio.Task } } }
self._debounce_buffers = {
'balances': {},
'positions': {},
'orders': {},
}
# Register with Exchange to be notified when private clients are closed
try:
from Exchange import register_private_client_close_listener
# register a small synchronous callback that schedules the async clear
def _on_closed_cb(exchange_id, user_name):
try:
loop = asyncio.get_running_loop()
loop.create_task(self._private_manager_maybe_clear_flags_for_exchange(exchange_id))
except Exception:
pass
register_private_client_close_listener(_on_closed_cb)
except Exception:
pass
# Logging is centralized via the module-level `_log_central` function.
def _load_settings(self):
"""Read `pbgui.ini` and update runtime settings when file changes.
Currently loads:
- pbdata.ws_max (int) -> passed to Exchange.set_ws_limits(global_max=...)
- pbdata.log_level (str) -> sets minimum log level for PBData
The function uses the file mtime to avoid re-reading the INI too often.
"""
try:
p = Path('pbgui.ini')
if not p.exists():
return
mtime = p.stat().st_mtime
if self._pbgui_ini_mtime is not None and mtime == self._pbgui_ini_mtime:
return
self._pbgui_ini_mtime = mtime
cfg = configparser.ConfigParser()
cfg.read('pbgui.ini')
# Note: payload debug flag removed — use log_level to control DEBUG logging.
# ws_max (integer) - global cap for private websocket clients
ws_max = None
if cfg.has_option('pbdata', 'ws_max'):
try:
raw = cfg.get('pbdata', 'ws_max')
sval = str(raw).strip() if raw is not None else ''
if sval != '':
try:
ws_max = int(sval)
except Exception:
ws_max = None
except Exception:
ws_max = None
if ws_max is not None and ws_max != getattr(self, '_ws_max_loaded', None):
try:
set_ws_limits(global_max=ws_max)
self._ws_max_loaded = ws_max
_human_log('PBData', f"Set Exchange.ws global cap via pbgui.ini [pbdata] ws_max={ws_max}", level='INFO')
except Exception:
try:
_human_log('PBData', f"Failed to call Exchange.set_ws_limits with ws_max={ws_max}", level='WARNING')
except Exception:
pass
# log_level (string) - minimum log level for PBData
log_level = None
if cfg.has_option('pbdata', 'log_level'):
try:
raw = cfg.get('pbdata', 'log_level')
s = str(raw).strip() if raw is not None else ''
if s != '':
log_level = s.upper()
except Exception:
log_level = None
if log_level is not None and log_level != getattr(self, '_log_level_loaded', None):
try:
set_service_min_level('PBData', log_level)
self._log_level_loaded = log_level
_human_log('PBData', f"PBData log level set via pbgui.ini [pbdata] log_level={log_level}", level='INFO')
except Exception:
try:
_human_log('PBData', f"Failed to set PBData log level to {log_level}", level='WARNING')
except Exception:
pass
# -----------------------------
# Timers / intervals
# -----------------------------
def _get_int_opt(section: str, key: str):
try:
if not cfg.has_option(section, key):
return None
raw = cfg.get(section, key)
sval = str(raw).strip() if raw is not None else ''
if sval == '':
return None
return int(float(sval))
except Exception:
return None
def _get_float_opt(section: str, key: str):
try:
if not cfg.has_option(section, key):
return None
raw = cfg.get(section, key)
sval = str(raw).strip() if raw is not None else ''
if sval == '':
return None
return float(sval)
except Exception:
return None
# Startup/grace period for starting shared pollers
new_pollers_delay = _get_int_opt('pbdata', 'pollers_delay_seconds')
if new_pollers_delay is not None and new_pollers_delay >= 0:
if new_pollers_delay != getattr(self, '_pollers_delay_seconds', 60.0):
old_enable_after = getattr(self, '_pollers_enabled_after_ts', None)
self._pollers_delay_seconds = float(new_pollers_delay)
# Only re-apply enable-after if pollers have not started yet
try:
now_ts = datetime.now().timestamp()
if old_enable_after is not None and now_ts < old_enable_after:
self._pollers_enabled_after_ts = now_ts + float(new_pollers_delay)
except Exception:
pass
# Shared poller intervals
new_combined = _get_int_opt('pbdata', 'poll_interval_combined_seconds')
if new_combined is not None and new_combined > 0:
if new_combined != getattr(self, '_shared_combined_interval_seconds', 90):
self._shared_combined_interval_seconds = int(new_combined)
self._poll_intervals_changed = True
new_history = _get_int_opt('pbdata', 'poll_interval_history_seconds')
if new_history is not None and new_history > 0:
if new_history != getattr(self, '_shared_history_interval_seconds', 90):
self._shared_history_interval_seconds = int(new_history)
self._poll_intervals_changed = True
new_exec = _get_int_opt('pbdata', 'poll_interval_executions_seconds')
if new_exec is not None and new_exec > 0:
if new_exec != getattr(self, '_shared_executions_interval_seconds', 1800):
self._shared_executions_interval_seconds = int(new_exec)
self._poll_intervals_changed = True
new_balance = _get_int_opt('pbdata', 'poll_interval_balance_seconds')
if new_balance is not None and new_balance > 0:
self._poll_interval_balance_seconds = int(new_balance)
new_positions = _get_int_opt('pbdata', 'poll_interval_positions_seconds')
if new_positions is not None and new_positions > 0:
self._poll_interval_positions_seconds = int(new_positions)
new_orders = _get_int_opt('pbdata', 'poll_interval_orders_seconds')
if new_orders is not None and new_orders > 0:
self._poll_interval_orders_seconds = int(new_orders)
# Latest 1m API fetch interval
new_latest_1m_interval = _get_int_opt('pbdata', 'latest_1m_interval_seconds')
if new_latest_1m_interval is not None and new_latest_1m_interval > 0:
if new_latest_1m_interval != getattr(self, '_latest_1m_interval_seconds', 120):
self._latest_1m_interval_seconds = int(new_latest_1m_interval)
# Latest 1m pause between coins
new_latest_1m_coin_pause = _get_float_opt('pbdata', 'latest_1m_coin_pause_seconds')
if new_latest_1m_coin_pause is not None and new_latest_1m_coin_pause >= 0:
self._latest_1m_coin_pause_seconds = float(new_latest_1m_coin_pause)
# Latest 1m API timeout
new_latest_1m_timeout = _get_float_opt('pbdata', 'latest_1m_api_timeout_seconds')
if new_latest_1m_timeout is not None and new_latest_1m_timeout > 0:
self._latest_1m_api_timeout_seconds = float(new_latest_1m_timeout)
# Latest 1m lookback days (min/max)
new_latest_1m_min_lb = _get_int_opt('pbdata', 'latest_1m_min_lookback_days')
if new_latest_1m_min_lb is not None and new_latest_1m_min_lb > 0:
self._latest_1m_min_lookback_days = int(new_latest_1m_min_lb)
new_latest_1m_max_lb = _get_int_opt('pbdata', 'latest_1m_max_lookback_days')
if new_latest_1m_max_lb is not None and new_latest_1m_max_lb > 0:
self._latest_1m_max_lookback_days = int(new_latest_1m_max_lb)
# Binance latest 1m settings
bnc_interval = _get_int_opt('binance_data', 'latest_1m_interval_seconds')
if bnc_interval is not None and bnc_interval > 0:
self._binance_latest_1m_interval_seconds = int(bnc_interval)
bnc_pause = _get_float_opt('binance_data', 'latest_1m_coin_pause_seconds')
if bnc_pause is not None and bnc_pause >= 0:
self._binance_latest_1m_coin_pause_seconds = float(bnc_pause)
bnc_timeout = _get_float_opt('binance_data', 'latest_1m_api_timeout_seconds')
if bnc_timeout is not None and bnc_timeout > 0:
self._binance_latest_1m_api_timeout_seconds = float(bnc_timeout)
bnc_min_lb = _get_int_opt('binance_data', 'latest_1m_min_lookback_days')
if bnc_min_lb is not None and bnc_min_lb > 0:
self._binance_latest_1m_min_lookback_days = int(bnc_min_lb)
bnc_max_lb = _get_int_opt('binance_data', 'latest_1m_max_lookback_days')
if bnc_max_lb is not None and bnc_max_lb > 0:
self._binance_latest_1m_max_lookback_days = int(bnc_max_lb)
# Bybit latest 1m settings
bbt_interval = _get_int_opt('bybit_data', 'latest_1m_interval_seconds')
if bbt_interval is not None and bbt_interval > 0:
self._bybit_latest_1m_interval_seconds = int(bbt_interval)
bbt_pause = _get_float_opt('bybit_data', 'latest_1m_coin_pause_seconds')
if bbt_pause is not None and bbt_pause >= 0:
self._bybit_latest_1m_coin_pause_seconds = float(bbt_pause)
bbt_timeout = _get_float_opt('bybit_data', 'latest_1m_api_timeout_seconds')
if bbt_timeout is not None and bbt_timeout > 0:
self._bybit_latest_1m_api_timeout_seconds = float(bbt_timeout)
bbt_min_lb = _get_int_opt('bybit_data', 'latest_1m_min_lookback_days')
if bbt_min_lb is not None and bbt_min_lb > 0:
self._bybit_latest_1m_min_lookback_days = int(bbt_min_lb)
bbt_max_lb = _get_int_opt('bybit_data', 'latest_1m_max_lookback_days')
if bbt_max_lb is not None and bbt_max_lb > 0:
self._bybit_latest_1m_max_lookback_days = int(bbt_max_lb)
# Pause between per-user REST calls in shared pollers
new_rest_pause = _get_float_opt('pbdata', 'shared_rest_user_pause_seconds')
if new_rest_pause is not None and new_rest_pause >= 0:
self._shared_rest_user_pause = float(new_rest_pause)
# Per-exchange overrides as JSON: {"exchange": seconds, ...}
if cfg.has_option('pbdata', 'shared_rest_pause_by_exchange_json'):
try:
raw = cfg.get('pbdata', 'shared_rest_pause_by_exchange_json')
sval = str(raw).strip() if raw is not None else ''
if sval != '':
obj = json.loads(sval)
if isinstance(obj, dict):
cleaned = {}
for k, v in obj.items():
try:
if k is None:
continue
exid = str(k).strip()
if exid == '':
continue
sec = float(v)
if sec < 0:
continue
cleaned[exid] = sec
except Exception:
continue
# merge into existing defaults
if cleaned:
self._shared_rest_pause_by_exchange.update(cleaned)
except Exception:
pass
# price_watch_timeout (float, seconds) — timeout for watch_* calls
new_pw_timeout = _get_float_opt('pbdata', 'price_watch_timeout')
if new_pw_timeout is not None and new_pw_timeout > 0:
self._price_watch_timeout = float(new_pw_timeout)
# rest_semaphore_acquire_timeout (float, seconds) — REST slot acquire timeout
new_rest_timeout = _get_float_opt('pbdata', 'rest_semaphore_acquire_timeout')
if new_rest_timeout is not None and new_rest_timeout > 0:
self._rest_semaphore_acquire_timeout = float(new_rest_timeout)
except Exception:
return
# -----------------------------
# Private client manager (caller-side)
# -----------------------------
def _start_private_client_manager(self):
"""Lazily start the private client manager background task."""
try:
if self._private_client_queue is None:
self._private_client_queue = asyncio.Queue()
if self._private_client_manager_task is None or self._private_client_manager_task.done():
self._private_client_manager_task = asyncio.create_task(self._private_client_manager_loop())
except Exception:
pass
def _write_market_data_status(self, payload: dict) -> None:
try:
logs_dir = _Path(f"{PBGDIR}/data/logs")
logs_dir.mkdir(parents=True, exist_ok=True)
tmp = self._market_data_status_path.with_suffix(".tmp")
with open(tmp, "w", encoding="utf-8") as f:
json.dump(payload, f, indent=2, sort_keys=True)
os.replace(tmp, self._market_data_status_path)
except Exception:
pass
async def _update_market_data_status(self, key: str, value) -> None:
"""Atomically update a single top-level key in market_data_status.json.
Uses an asyncio.Lock so concurrent loops (HL, Binance, …) never
clobber each other's sections, even if they await between read and write.
"""
async with self._market_data_status_lock:
existing: dict = {}
try:
if self._market_data_status_path.exists():
existing = json.loads(
self._market_data_status_path.read_text(encoding="utf-8")
)
except Exception:
pass
existing[key] = value
existing["timestamp"] = datetime.now().isoformat(sep=" ", timespec="seconds")
self._write_market_data_status(existing)
async def _latest_1m_loop(self):
await asyncio.sleep(5)
_first_iter = True
_resume_after_coin: str = ""
while True:
try:
# Reload settings from pbgui.ini each loop so GUI changes
# to latest_1m_* settings take effect without restart.
try:
self._load_settings()
except Exception:
pass
if not self._latest_1m_enabled:
await asyncio.sleep(5)
continue
# On first start after restart: respect remaining interval or resume crashed mid-cycle
if _first_iter:
_first_iter = False
try:
_saved = json.loads(self._market_data_status_path.read_text(encoding="utf-8")).get("latest_1m", {})
_last_ts = float(_saved.get("last_run_ts") or 0.0)
_interval = float(self._latest_1m_interval_seconds)
_remaining = _interval - (datetime.now().timestamp() - _last_ts) - 5.0
if _saved.get("running"):
_resume_after_coin = str(_saved.get("current_coin") or "")
_human_log("PBData", f"[market-data] HL latest_1m: resuming after restart, skipping up to {_resume_after_coin!r}", level="INFO")
elif _remaining > 2.0:
_human_log("PBData", f"[market-data] HL latest_1m: {_remaining:.0f}s remaining in cycle — waiting on restart", level="INFO")
_hl_flag_r = _Path(f"{PBGDIR}/data/logs/hyperliquid_latest_1m_run_now.flag")
if await _wait_for_flag(_hl_flag_r, _remaining):
try:
_hl_flag_r.unlink(missing_ok=True)
except Exception:
pass
except Exception:
pass
cfg = load_market_data_config()
coins = list(cfg.enabled_coins.get("hyperliquid", []) or [])
coins = [str(c).strip().upper() for c in coins if str(c).strip()]
invalid_live_meta_coins: set[str] = set()
# Determine how many coins to skip when resuming a crashed mid-cycle
_coins_done_offset = 0
if _resume_after_coin:
if _resume_after_coin in coins:
_coins_done_offset = coins.index(_resume_after_coin) + 1
else:
_resume_after_coin = "" # coin no longer enabled, start fresh
now = datetime.now()
now_ts = now.timestamp()
# Seed with previous coin results so table stays filled during the run
_prev_hl: dict = {}
try:
_prev_hl = json.loads(self._market_data_status_path.read_text(encoding="utf-8")).get("latest_1m", {}).get("coins", {})
except Exception:
pass
status = {
"exchange": "hyperliquid",
"interval_seconds": int(self._latest_1m_interval_seconds),
"last_run_ts": int(now_ts),
"running": True,
"current_coin": None,
"coins_done": _coins_done_offset,
"coins_total": len(coins),
"coins": dict(_prev_hl),
}
try:
await self._update_market_data_status("latest_1m", status)
except Exception:
pass
_skipping = bool(_resume_after_coin)
for coin in coins:
if _skipping:
if coin == _resume_after_coin:
_skipping = False
_resume_after_coin = ""
continue
coin_status = {
"last_fetch": None,
"result": "skipped",
}
max_lb = int(self._latest_1m_max_lookback_days)
lookback_days = int(self._latest_1m_min_lookback_days)
newest_day = ""
try:
cov = get_daily_hour_coverage_for_dataset("hyperliquid", "1m_api", coin)
newest_day = str(cov.get("newest_day") or "")
if newest_day:
d_new = datetime.strptime(newest_day, "%Y%m%d").date()
days_since = (datetime.utcnow().date() - d_new).days
if days_since < 0:
days_since = 0
lookback_days = max(lookback_days, days_since + 1)
else:
# No local API data yet: pull the full allowed lookback window.
lookback_days = max_lb
coin_status["note"] = "no_local_api_data"
except Exception as e:
coin_status["error"] = f"coverage:{type(e).__name__}"
if lookback_days > max_lb:
coin_status["note"] = "api_window_limited"
lookback_days = max_lb
try:
# Acquire rate budget for HL candleSnapshot.
# Weight = 44/day (20 base + ceil(1440 candles/60) = 44 per HL docs).
# update_latest_hyperliquid_1m_api_for_coin makes one API call per
# missing day → acquire the full upper bound up-front.
_cs_weight = get_weight('hyperliquid', 'candle_snapshot') * max(1, int(lookback_days))
if not await self._acquire_rate_budget('hyperliquid', 'candle_snapshot', timeout=120.0, weight_override=_cs_weight):
coin_status["last_fetch"] = datetime.now().isoformat(sep=" ", timespec="seconds")
coin_status["result"] = "budget_timeout"
else:
res = await asyncio.to_thread(
update_latest_hyperliquid_1m_api_for_coin,
coin=coin,
lookback_days=int(lookback_days),
overwrite=False,
dry_run=False,
timeout_s=float(self._latest_1m_api_timeout_seconds),
)
coin_status["last_fetch"] = datetime.now().isoformat(sep=" ", timespec="seconds")
coin_status["result"] = "ok"
coin_status["lookback_days"] = int(lookback_days)
coin_status["newest_day"] = newest_day
coin_status["api_result"] = res
try:
if isinstance(res, dict) and bool(res.get("skipped")) and str(res.get("skip_reason") or "") == "not_in_live_meta":
invalid_live_meta_coins.add(str(coin).strip().upper())
except Exception:
pass
try:
_refresh_inventory_coin("hyperliquid", "1m_api", coin)
except Exception:
pass
except Exception as e:
coin_status["last_fetch"] = datetime.now().isoformat(sep=" ", timespec="seconds")
coin_status["result"] = "error"
coin_status["error"] = str(e)
status["coins"][coin] = coin_status
status["coins_done"] = status.get("coins_done", 0) + 1
status["current_coin"] = coin
try:
await self._update_market_data_status("latest_1m", status)
except Exception:
pass
# Check stop flag
_hl_stop = _Path(f"{PBGDIR}/data/logs/hyperliquid_latest_1m_stop.flag")
if _hl_stop.exists():
try:
_hl_stop.unlink(missing_ok=True)
except Exception:
pass
status["running"] = False
status["current_coin"] = None
status["stopped"] = True
status["last_run_duration_s"] = int(datetime.now().timestamp() - now_ts)
try:
await self._update_market_data_status("latest_1m", status)
except Exception:
pass
break
# Pause between coins to avoid rate limits
if self._latest_1m_coin_pause_seconds > 0:
await asyncio.sleep(float(self._latest_1m_coin_pause_seconds))
_resume_after_coin = "" # consumed — next iteration starts fresh
if invalid_live_meta_coins:
try:
updated_enabled = [c for c in coins if str(c).strip().upper() not in invalid_live_meta_coins]
set_enabled_coins("hyperliquid", updated_enabled)
_human_log(
"PBData",
f"[market-data] removed invalid live-meta coins from enabled list: {', '.join(sorted(invalid_live_meta_coins))}",
level="WARNING",
)
except Exception as e:
_human_log("PBData", f"[market-data] failed to auto-prune invalid live-meta coins: {e}", level="WARNING")
if now_ts - float(self._latest_1m_last_hist_scan_ts or 0.0) >= float(self._latest_1m_hist_interval_seconds):
self._latest_1m_last_hist_scan_ts = now_ts
hist = {
"exchange": "hyperliquid",
"last_scan_ts": int(now_ts),
"coins": {},
}
for coin in coins:
cov = get_daily_hour_coverage_for_dataset("hyperliquid", "l2Book", coin)
hist["coins"][coin] = {
"oldest_day": str(cov.get("oldest_day") or ""),
"newest_day": str(cov.get("newest_day") or ""),
"missing_days_count": int(cov.get("missing_days_count") or 0),
"coverage_pct": float(cov.get("coverage_pct") or 0.0),
}
status["historical"] = hist
status["running"] = False
status["current_coin"] = None
status["last_run_duration_s"] = int(datetime.now().timestamp() - now_ts)
await self._update_market_data_status("latest_1m", status)
if "historical" in status:
await self._update_market_data_status("historical", status["historical"])
except Exception as e:
try:
_human_log('PBData', f"[market-data] latest_1m loop error: {e}", level='WARNING')
except Exception:
pass
_hl_flag = _Path(f"{PBGDIR}/data/logs/hyperliquid_latest_1m_run_now.flag")
try:
_elapsed = datetime.now().timestamp() - now_ts
_remaining_wait = max(0.0, float(self._latest_1m_interval_seconds) - _elapsed)
except Exception:
_remaining_wait = float(self._latest_1m_interval_seconds)
if await _wait_for_flag(_hl_flag, _remaining_wait):
try:
_hl_flag.unlink(missing_ok=True)
except Exception:
pass
async def _binance_latest_1m_loop(self):
"""Background loop: refresh Binance USDM 1m candles for enabled coins."""
await asyncio.sleep(8) # Slight offset from HL loop
_first_iter = True
_resume_after_coin: str = ""
while True:
try: