Skip to content

Commit dd91225

Browse files
committed
Handle time-indexing for fill arrows
Call into a reworked `Flume.get_index()` for both the slow and fast chart and do time index clipping to last datum where necessary.
1 parent 2c42a06 commit dd91225

File tree

2 files changed

+36
-28
lines changed

2 files changed

+36
-28
lines changed

piker/data/flows.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,13 @@
4848
from ._sampling import (
4949
iter_ohlc_periods,
5050
)
51-
from .._profile import (
52-
Profiler,
53-
pg_profile_enabled,
54-
)
51+
# from .._profile import (
52+
# Profiler,
53+
# pg_profile_enabled,
54+
# )
5555

5656
if TYPE_CHECKING:
57-
from pyqtgraph import PlotItem
57+
# from pyqtgraph import PlotItem
5858
from .feed import Feed
5959

6060

@@ -235,18 +235,18 @@ def from_msg(cls, msg: dict) -> dict:
235235
def get_index(
236236
self,
237237
time_s: float,
238+
array: np.ndarray,
238239

239-
) -> int:
240+
) -> int | float:
240241
'''
241242
Return array shm-buffer index for for epoch time.
242243
243244
'''
244-
array = self.rt_shm.array
245245
times = array['time']
246-
mask = (times >= time_s)
247-
248-
if any(mask):
249-
return array['index'][mask][0]
250-
251-
# just the latest index
252-
return array['index'][-1]
246+
first = np.searchsorted(
247+
times,
248+
time_s,
249+
side='left',
250+
)
251+
imx = times.shape[0] - 1
252+
return min(first, imx)

piker/ui/order_mode.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,7 @@ def on_fill(
494494

495495
uuid: str,
496496
price: float,
497-
arrow_index: float,
497+
time_s: float,
498498

499499
pointing: Optional[str] = None,
500500

@@ -513,18 +513,26 @@ def on_fill(
513513
'''
514514
dialog = self.dialogs[uuid]
515515
lines = dialog.lines
516+
chart = self.chart
517+
516518
# XXX: seems to fail on certain types of races?
517519
# assert len(lines) == 2
518520
if lines:
519-
flume: Flume = self.feed.flumes[self.chart.linked.symbol.fqsn]
521+
flume: Flume = self.feed.flumes[chart.linked.symbol.fqsn]
520522
_, _, ratio = flume.get_ds_info()
521-
for i, chart in [
522-
(arrow_index, self.chart),
523-
(flume.izero_hist
524-
+
525-
round((arrow_index - flume.izero_rt)/ratio),
526-
self.hist_chart)
523+
524+
for chart, shm in [
525+
(self.chart, flume.rt_shm),
526+
(self.hist_chart, flume.hist_shm),
527527
]:
528+
viz = chart.get_viz(chart.name)
529+
index_field = viz.index_field
530+
arr = shm.array
531+
index = flume.get_index(time_s, arr)
532+
533+
if index_field == 'time':
534+
i = arr['time'][index]
535+
528536
self.arrows.add(
529537
chart.plotItem,
530538
uuid,
@@ -933,6 +941,8 @@ async def process_trade_msg(
933941
fmsg = pformat(msg)
934942
log.debug(f'Received order msg:\n{fmsg}')
935943
name = msg['name']
944+
viz = mode.chart.get_viz(mode.chart.name)
945+
index_field = viz.index_field
936946

937947
if name in (
938948
'position',
@@ -1037,11 +1047,11 @@ async def process_trade_msg(
10371047
# should only be one "fill" for an alert
10381048
# add a triangle and remove the level line
10391049
req = Order(**req)
1040-
index = flume.get_index(time.time())
1050+
tm = time.time()
10411051
mode.on_fill(
10421052
oid,
10431053
price=req.price,
1044-
arrow_index=index,
1054+
time_s=tm,
10451055
)
10461056
mode.lines.remove_line(uuid=oid)
10471057
msg.req = req
@@ -1080,10 +1090,8 @@ async def process_trade_msg(
10801090
# a true backend one? This will require finagling
10811091
# with how each backend tracks/summarizes time
10821092
# stamps for the downstream API.
1083-
index = flume.get_index(
1084-
details['broker_time']
1085-
)
1086-
1093+
tm = details['broker_time']
1094+
index = flume.get_index(tm, index_field)
10871095
# TODO: some kinda progress system
10881096
mode.on_fill(
10891097
oid,

0 commit comments

Comments
 (0)