From c85324f142cf1f0d52a5b1982f2734ec98dac73b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 28 Feb 2023 00:22:45 -0500 Subject: [PATCH 01/11] `deribit`: drop removed (now deprecated and removed) `.backfill_bars()` endpoint --- piker/brokers/deribit/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/piker/brokers/deribit/__init__.py b/piker/brokers/deribit/__init__.py index f5c48b58d..b921ea673 100644 --- a/piker/brokers/deribit/__init__.py +++ b/piker/brokers/deribit/__init__.py @@ -30,7 +30,6 @@ open_history_client, open_symbol_search, stream_quotes, - backfill_bars ) # from .broker import ( # trades_dialogue, From 18d70447cd7d64a781aad217c9a812602964c1ef Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 28 Feb 2023 00:23:16 -0500 Subject: [PATCH 02/11] `deribit`: various lib API compat fixes - port to new `msgspec` "default fields must come after non-default ones" shite they changed. - adjust to `open_jsonrpc_session()` kwarg remap: `dtype` -> `response_type=JSONRPCResult`. --- piker/brokers/deribit/api.py | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 4159b18a7..1fd345544 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -89,19 +89,20 @@ class JSONRPCResult(Struct): - jsonrpc: str = '2.0' id: int - result: Optional[dict] = None - error: Optional[dict] = None - usIn: int - usOut: int + usIn: int + usOut: int usDiff: int testnet: bool + jsonrpc: str = '2.0' + result: Optional[dict] = None + error: Optional[dict] = None + class JSONRPCChannel(Struct): - jsonrpc: str = '2.0' method: str params: dict + jsonrpc: str = '2.0' class KLinesResult(Struct): @@ -114,6 +115,7 @@ class KLinesResult(Struct): ticks: list[int] volume: list[float] + class Trade(Struct): trade_seq: int trade_id: str @@ -125,9 +127,10 @@ class Trade(Struct): instrument_name: str index_price: float direction: str + amount: float combo_trade_id: Optional[int] = 0, combo_id: Optional[str] = '', - amount: float + class LastTradesResult(Struct): trades: list[Trade] @@ -145,8 +148,8 @@ def str_to_cb_sym(name: str) -> Symbol: quote = base if option_type == 'put': - option_type = PUT - elif option_type == 'call': + option_type = PUT + elif option_type == 'call': option_type = CALL else: raise Exception("Couldn\'t parse option type") @@ -167,8 +170,8 @@ def piker_sym_to_cb_sym(name: str) -> Symbol: quote = base if option_type == 'P': - option_type = PUT - elif option_type == 'C': + option_type = PUT + elif option_type == 'C': option_type = CALL else: raise Exception("Couldn\'t parse option type") @@ -431,7 +434,9 @@ async def get_client( async with ( trio.open_nursery() as n, open_jsonrpc_session( - _testnet_ws_url, dtype=JSONRPCResult) as json_rpc + _testnet_ws_url, + response_type=JSONRPCResult + ) as json_rpc ): client = Client(json_rpc) From 3bfe541259cb49a8fd5e6b12c145c5cf83be37c3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 28 Feb 2023 00:25:53 -0500 Subject: [PATCH 03/11] `deribit`: fix history query routine sig to take `timeframe: float` --- piker/brokers/deribit/feed.py | 1 + 1 file changed, 1 insertion(+) diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index deb0422f8..865d0e6f9 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -69,6 +69,7 @@ async def open_history_client( async with open_cached_client('deribit') as client: async def get_ohlc( + timeframe: float, end_dt: Optional[datetime] = None, start_dt: Optional[datetime] = None, From e6fd2adb69ff46ae9f7ca8e6e7ca64267b05eac2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 28 Feb 2023 00:26:31 -0500 Subject: [PATCH 04/11] Include `deribit` backend in default brokers scan set --- piker/brokers/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/piker/brokers/__init__.py b/piker/brokers/__init__.py index a35e4aea2..16069a055 100644 --- a/piker/brokers/__init__.py +++ b/piker/brokers/__init__.py @@ -24,6 +24,7 @@ 'binance', 'ib', 'kraken', + 'deribit', # broken but used to work # 'questrade', From 2ea850eed0bcf0c37767a864cefbdfcb9c5601eb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 3 Mar 2023 00:23:09 -0500 Subject: [PATCH 05/11] `deribit`: add new `Trade.block_trade_id` field.. --- piker/brokers/deribit/api.py | 5 +++-- piker/brokers/deribit/feed.py | 7 +++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 1fd345544..f93a20e59 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -128,8 +128,9 @@ class Trade(Struct): index_price: float direction: str amount: float - combo_trade_id: Optional[int] = 0, - combo_id: Optional[str] = '', + combo_trade_id: Optional[int] = 0 + combo_id: Optional[str] = '' + block_trade_id: str | None = '' class LastTradesResult(Struct): diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index 865d0e6f9..7161aab52 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -46,9 +46,12 @@ from cryptofeed.symbols import Symbol from .api import ( - Client, Trade, + Client, + Trade, get_config, - str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst, + str_to_cb_sym, + piker_sym_to_cb_sym, + cb_sym_to_deribit_inst, maybe_open_price_feed ) From 1bd421a0f3b88e91b3acc85784cb2ef7647457d1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 3 Mar 2023 12:27:03 -0500 Subject: [PATCH 06/11] Block hist queries for non-60s --- piker/brokers/deribit/feed.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index 7161aab52..d77a7e1b1 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -81,12 +81,15 @@ async def get_ohlc( datetime, # start datetime, # end ]: + if timeframe != 60: + raise DataUnavailable('Only 1m bars are supported') array = await client.bars( instrument, start_dt=start_dt, end_dt=end_dt, ) + if len(array) == 0: raise DataUnavailable From 4c838474bebd2949bf73fa71e4109bbb3953c88c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 3 Mar 2023 18:32:24 -0500 Subject: [PATCH 07/11] `flake8` linter cleanup and comment out order ctl draft code --- piker/brokers/deribit/api.py | 251 ++++++++++++++++++---------------- piker/brokers/deribit/feed.py | 24 +--- 2 files changed, 139 insertions(+), 136 deletions(-) diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index f93a20e59..62b4b7882 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Guillermo Rodriguez (in stewardship for piker0) +# Copyright (C) Guillermo Rodriguez (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -18,49 +18,48 @@ Deribit backend. ''' -import json +from __future__ import annotations import time import asyncio -from contextlib import asynccontextmanager as acm, AsyncExitStack +from contextlib import asynccontextmanager as acm from functools import partial from datetime import datetime -from typing import Any, Optional, Iterable, Callable +from typing import ( + Any, + Optional, + Callable, +) +from cryptofeed import FeedHandler +from cryptofeed.defines import ( + DERIBIT, + L1_BOOK, + TRADES, + OPTION, + CALL, + PUT, +) import pendulum -import asks import trio -from trio_typing import Nursery, TaskStatus +from trio_typing import TaskStatus from fuzzywuzzy import process as fuzzy import numpy as np +from tractor.trionics import ( + broadcast_receiver, + maybe_open_context +) +from tractor import to_asyncio +from cryptofeed.symbols import Symbol from piker.data.types import Struct from piker.data._web_bs import ( - NoBsWs, - open_autorecon_ws, open_jsonrpc_session ) -from .._util import resproc - from piker import config from piker.log import get_logger -from tractor.trionics import ( - broadcast_receiver, - BroadcastReceiver, - maybe_open_context -) -from tractor import to_asyncio - -from cryptofeed import FeedHandler - -from cryptofeed.defines import ( - DERIBIT, - L1_BOOK, TRADES, - OPTION, CALL, PUT -) -from cryptofeed.symbols import Symbol log = get_logger(__name__) @@ -189,8 +188,13 @@ def cb_sym_to_deribit_inst(sym: Symbol): # cryptofeed normalized cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z'] - # deribit specific - months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC'] + # deribit specific + months = [ + 'JAN', 'FEB', 'MAR', + 'APR', 'MAY', 'JUN', + 'JUL', 'AUG', 'SEP', + 'OCT', 'NOV', 'DEC', + ] exp = sym.expiry_date @@ -210,14 +214,15 @@ def get_config() -> dict[str, Any]: section = conf.get('deribit') - # TODO: document why we send this, basically because logging params for cryptofeed + # TODO: document why we send this, basically because logging params + # for cryptofeed conf['log'] = {} conf['log']['disabled'] = True if section is None: log.warning(f'No config section found for deribit in {path}') - return conf + return conf class Client: @@ -364,6 +369,7 @@ async def bars( end_dt: Optional[datetime] = None, limit: int = 1000, as_np: bool = True, + ) -> dict: instrument = symbol @@ -389,14 +395,8 @@ async def bars( result = KLinesResult(**resp.result) new_bars = [] - for i in range(len(result.close)): - - _open = result.open[i] - high = result.high[i] - low = result.low[i] - close = result.close[i] - volume = result.volume[i] + for i in range(len(result.close)): row = [ (start_time + (i * (60 * 1000))) / 1000.0, # time result.open[i], @@ -409,7 +409,7 @@ async def bars( new_bars.append((i,) + tuple(row)) - array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else klines + array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else new_bars return array async def last_trades( @@ -463,7 +463,7 @@ async def _auth_loop( if time.time() - _expiry_time < renew_time: # if we are close to token expiry time - if _refresh_token != None: + if _refresh_token is not None: # if we have a refresh token already dont need to send # secret params = { @@ -473,7 +473,8 @@ async def _auth_loop( } else: - # we don't have refresh token, send secret to initialize + # we don't have refresh token, send secret to + # initialize params = { 'grant_type': 'client_credentials', 'client_id': client._key_id, @@ -541,20 +542,30 @@ async def _trade(data: dict, receipt_timestamp): })) async def _l1(data: dict, receipt_timestamp): - to_trio.send_nowait(('l1', { - 'symbol': cb_sym_to_deribit_inst( - str_to_cb_sym(data.symbol)).lower(), - 'ticks': [ - {'type': 'bid', - 'price': float(data.bid_price), 'size': float(data.bid_size)}, - {'type': 'bsize', - 'price': float(data.bid_price), 'size': float(data.bid_size)}, - {'type': 'ask', - 'price': float(data.ask_price), 'size': float(data.ask_size)}, - {'type': 'asize', - 'price': float(data.ask_price), 'size': float(data.ask_size)} - ] - })) + to_trio.send_nowait( + ('l1', { + 'symbol': cb_sym_to_deribit_inst( + str_to_cb_sym(data.symbol)).lower(), + 'ticks': [ + + {'type': 'bid', + 'price': float(data.bid_price), + 'size': float(data.bid_size)}, + + {'type': 'bsize', + 'price': float(data.bid_price), + 'size': float(data.bid_size)}, + + {'type': 'ask', + 'price': float(data.ask_price), + 'size': float(data.ask_size)}, + + {'type': 'asize', + 'price': float(data.ask_price), + 'size': float(data.ask_size)} + ] + }) + ) fh.add_feed( DERIBIT, @@ -610,69 +621,71 @@ async def maybe_open_price_feed( yield feed - -async def aio_order_feed_relay( - fh: FeedHandler, - instrument: Symbol, - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, -) -> None: - async def _fill(data: dict, receipt_timestamp): - breakpoint() - - async def _order_info(data: dict, receipt_timestamp): - breakpoint() - - fh.add_feed( - DERIBIT, - channels=[FILLS, ORDER_INFO], - symbols=[instrument.upper()], - callbacks={ - FILLS: _fill, - ORDER_INFO: _order_info, - }) - - if not fh.running: - fh.run( - start_loop=False, - install_signal_handlers=False) - - # sync with trio - to_trio.send_nowait(None) - - await asyncio.sleep(float('inf')) - - -@acm -async def open_order_feed( - instrument: list[str] -) -> trio.abc.ReceiveStream: - async with maybe_open_feed_handler() as fh: - async with to_asyncio.open_channel_from( - partial( - aio_order_feed_relay, - fh, - instrument - ) - ) as (first, chan): - yield chan - - -@acm -async def maybe_open_order_feed( - instrument: str -) -> trio.abc.ReceiveStream: - - # TODO: add a predicate to maybe_open_context - async with maybe_open_context( - acm_func=open_order_feed, - kwargs={ - 'instrument': instrument, - 'fh': fh - }, - key=f'{instrument}-order', - ) as (cache_hit, feed): - if cache_hit: - yield broadcast_receiver(feed, 10) - else: - yield feed +# TODO: order broker support: this is all draft code from @guilledk B) + +# async def aio_order_feed_relay( +# fh: FeedHandler, +# instrument: Symbol, +# from_trio: asyncio.Queue, +# to_trio: trio.abc.SendChannel, + +# ) -> None: +# async def _fill(data: dict, receipt_timestamp): +# breakpoint() + +# async def _order_info(data: dict, receipt_timestamp): +# breakpoint() + +# fh.add_feed( +# DERIBIT, +# channels=[FILLS, ORDER_INFO], +# symbols=[instrument.upper()], +# callbacks={ +# FILLS: _fill, +# ORDER_INFO: _order_info, +# }) + +# if not fh.running: +# fh.run( +# start_loop=False, +# install_signal_handlers=False) + +# # sync with trio +# to_trio.send_nowait(None) + +# await asyncio.sleep(float('inf')) + + +# @acm +# async def open_order_feed( +# instrument: list[str] +# ) -> trio.abc.ReceiveStream: +# async with maybe_open_feed_handler() as fh: +# async with to_asyncio.open_channel_from( +# partial( +# aio_order_feed_relay, +# fh, +# instrument +# ) +# ) as (first, chan): +# yield chan + + +# @acm +# async def maybe_open_order_feed( +# instrument: str +# ) -> trio.abc.ReceiveStream: + +# # TODO: add a predicate to maybe_open_context +# async with maybe_open_context( +# acm_func=open_order_feed, +# kwargs={ +# 'instrument': instrument, +# 'fh': fh +# }, +# key=f'{instrument}-order', +# ) as (cache_hit, feed): +# if cache_hit: +# yield broadcast_receiver(feed, 10) +# else: +# yield feed diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index d77a7e1b1..da5211ced 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -20,36 +20,26 @@ ''' from contextlib import asynccontextmanager as acm from datetime import datetime -from typing import Any, Optional, Callable -import time +from typing import ( + Callable, +) import trio from trio_typing import TaskStatus import pendulum -from fuzzywuzzy import process as fuzzy import numpy as np import tractor from piker._cacheables import open_cached_client from piker.log import get_logger, get_console_log -from piker.data import ShmArray from piker.brokers._util import ( - BrokerError, DataUnavailable, ) -from cryptofeed import FeedHandler - -from cryptofeed.defines import ( - DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT -) -from cryptofeed.symbols import Symbol from .api import ( Client, Trade, - get_config, - str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst, maybe_open_price_feed @@ -73,8 +63,8 @@ async def open_history_client( async def get_ohlc( timeframe: float, - end_dt: Optional[datetime] = None, - start_dt: Optional[datetime] = None, + end_dt: datetime | None = None, + start_dt: datetime | None = None, ) -> tuple[ np.ndarray, @@ -139,7 +129,7 @@ async def stream_quotes( async with maybe_open_price_feed(sym) as stream: - cache = await client.cache_symbols() + await client.cache_symbols() last_trades = (await client.last_trades( cb_sym_to_deribit_inst(nsym), count=1)).trades @@ -181,7 +171,7 @@ async def open_symbol_search( async with open_cached_client('deribit') as client: # load all symbols locally for fast search - cache = await client.cache_symbols() + await client.cache_symbols() await ctx.started() async with ctx.open_stream() as stream: From 1c833e7175f70a2fe56a88a8174167c964039a98 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Wed, 8 Mar 2023 13:32:47 -0300 Subject: [PATCH 08/11] Remove cryptofeeds/asyncio from deribit backend Add hook management to open_jsonrpc_session helper --- piker/brokers/deribit/__init__.py | 10 -- piker/brokers/deribit/api.py | 262 ++++++++++-------------------- piker/brokers/deribit/feed.py | 23 +-- piker/data/_web_bs.py | 51 ++++-- 4 files changed, 130 insertions(+), 216 deletions(-) diff --git a/piker/brokers/deribit/__init__.py b/piker/brokers/deribit/__init__.py index b921ea673..4b71e22c0 100644 --- a/piker/brokers/deribit/__init__.py +++ b/piker/brokers/deribit/__init__.py @@ -52,13 +52,3 @@ 'feed', # 'broker', ] - -# passed to ``tractor.ActorNursery.start_actor()`` -_spawn_kwargs = { - 'infect_asyncio': True, -} - -# annotation to let backend agnostic code -# know if ``brokerd`` should be spawned with -# ``tractor``'s aio mode. -_infect_asyncio: bool = True diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 62b4b7882..59a49a588 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -20,7 +20,6 @@ ''' from __future__ import annotations import time -import asyncio from contextlib import asynccontextmanager as acm from functools import partial @@ -31,15 +30,6 @@ Callable, ) -from cryptofeed import FeedHandler -from cryptofeed.defines import ( - DERIBIT, - L1_BOOK, - TRADES, - OPTION, - CALL, - PUT, -) import pendulum import trio from trio_typing import TaskStatus @@ -49,8 +39,6 @@ broadcast_receiver, maybe_open_context ) -from tractor import to_asyncio -from cryptofeed.symbols import Symbol from piker.data.types import Struct from piker.data._web_bs import ( @@ -59,16 +47,12 @@ from piker import config from piker.log import get_logger +from piker._cacheables import open_cached_client log = get_logger(__name__) -_spawn_kwargs = { - 'infect_asyncio': True, -} - - _url = 'https://www.deribit.com' _ws_url = 'wss://www.deribit.com/ws/api/v2' _testnet_ws_url = 'wss://test.deribit.com/ws/api/v2' @@ -142,70 +126,12 @@ def deribit_timestamp(when): return int((when.timestamp() * 1000) + (when.microsecond / 1000)) -def str_to_cb_sym(name: str) -> Symbol: - base, strike_price, expiry_date, option_type = name.split('-') - - quote = base - - if option_type == 'put': - option_type = PUT - elif option_type == 'call': - option_type = CALL - else: - raise Exception("Couldn\'t parse option type") - - return Symbol( - base, quote, - type=OPTION, - strike_price=strike_price, - option_type=option_type, - expiry_date=expiry_date, - expiry_normalize=False) - - -def piker_sym_to_cb_sym(name: str) -> Symbol: - base, expiry_date, strike_price, option_type = tuple( - name.upper().split('-')) - - quote = base - - if option_type == 'P': - option_type = PUT - elif option_type == 'C': - option_type = CALL - else: - raise Exception("Couldn\'t parse option type") - - return Symbol( - base, quote, - type=OPTION, - strike_price=strike_price, - option_type=option_type, - expiry_date=expiry_date.upper()) +def sym_fmt_piker_to_deribit(sym: str) -> str: + return sym.upper() -def cb_sym_to_deribit_inst(sym: Symbol): - # cryptofeed normalized - cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z'] - - # deribit specific - months = [ - 'JAN', 'FEB', 'MAR', - 'APR', 'MAY', 'JUN', - 'JUL', 'AUG', 'SEP', - 'OCT', 'NOV', 'DEC', - ] - - exp = sym.expiry_date - - # YYMDD - # 01234 - year, month, day = ( - exp[:2], months[cb_norm.index(exp[2:3])], exp[3:]) - - otype = 'C' if sym.option_type == CALL else 'P' - - return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}' +def sym_fmt_deribit_to_piker(sym: str): + return sym.lower() def get_config() -> dict[str, Any]: @@ -214,11 +140,6 @@ def get_config() -> dict[str, Any]: section = conf.get('deribit') - # TODO: document why we send this, basically because logging params - # for cryptofeed - conf['log'] = {} - conf['log']['disabled'] = True - if section is None: log.warning(f'No config section found for deribit in {path}') @@ -227,7 +148,13 @@ def get_config() -> dict[str, Any]: class Client: - def __init__(self, json_rpc: Callable) -> None: + def __init__( + self, + json_rpc: Callable, + update_hooks: Callable, + update_types: Callable, + ) -> None: + self._pairs: dict[str, Any] = None config = get_config().get('deribit', {}) @@ -241,6 +168,8 @@ def __init__(self, json_rpc: Callable) -> None: self._key_secret = None self.json_rpc = json_rpc + self.update_hooks = update_hooks + self.update_types = update_types @property def currencies(self): @@ -287,7 +216,7 @@ async def submit_limit( """Place an order """ params = { - 'instrument_name': symbol.upper(), + 'instrument_name': sym_fmt_piker_to_deribit(symbol), 'amount': size, 'type': 'limit', 'price': price, @@ -328,7 +257,7 @@ async def symbol_info( results = resp.result instruments = { - item['instrument_name'].lower(): item + sym_fmt_deribit_to_piker(item['instrument_name']): item for item in results } @@ -359,8 +288,10 @@ async def search_symbols( limit=limit ) # repack in dict form - return {item[0]['instrument_name'].lower(): item[0] - for item in matches} + return { + sym_fmt_deribit_to_piker(item[0]['instrument_name']): item[0] + for item in matches + } async def bars( self, @@ -387,7 +318,7 @@ async def bars( resp = await self.json_rpc( 'public/get_tradingview_chart_data', params={ - 'instrument_name': instrument.upper(), + 'instrument_name': sym_fmt_piker_to_deribit(instrument), 'start_timestamp': start_time, 'end_timestamp': end_time, 'resolution': '1' @@ -420,13 +351,19 @@ async def last_trades( resp = await self.json_rpc( 'public/get_last_trades_by_instrument', params={ - 'instrument_name': instrument, + 'instrument_name': sym_fmt_piker_to_deribit(instrument), 'count': count }) return LastTradesResult(**resp.result) +class JSONRPCSubRequest(Struct): + method: str + params: dict + jsonrpc: str = '2.0' + + @acm async def get_client( is_brokercheck: bool = False @@ -435,11 +372,11 @@ async def get_client( async with ( trio.open_nursery() as n, open_jsonrpc_session( - _testnet_ws_url, + _ws_url, response_type=JSONRPCResult - ) as json_rpc + ) as control_functions ): - client = Client(json_rpc) + client = Client(*control_functions) _refresh_token: Optional[str] = None _access_token: Optional[str] = None @@ -452,7 +389,7 @@ async def _auth_loop( https://docs.deribit.com/?python#authentication-2 """ - renew_time = 10 + renew_time = 240 access_scope = 'trade:read_write' _expiry_time = time.time() got_access = False @@ -482,7 +419,7 @@ async def _auth_loop( 'scope': access_scope } - resp = await json_rpc('public/auth', params) + resp = await client.json_rpc('public/auth', params) result = resp.result _expiry_time = time.time() + result['expires_in'] @@ -510,96 +447,67 @@ async def _auth_loop( @acm -async def open_feed_handler(): - fh = FeedHandler(config=get_config()) - yield fh - await to_asyncio.run_task(fh.stop_async) - +async def open_price_feed( + instrument: str +) -> trio.abc.ReceiveStream: -@acm -async def maybe_open_feed_handler() -> trio.abc.ReceiveStream: - async with maybe_open_context( - acm_func=open_feed_handler, - key='feedhandler', - ) as (cache_hit, fh): - yield fh - - -async def aio_price_feed_relay( - fh: FeedHandler, - instrument: Symbol, - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, -) -> None: - async def _trade(data: dict, receipt_timestamp): - to_trio.send_nowait(('trade', { - 'symbol': cb_sym_to_deribit_inst( - str_to_cb_sym(data.symbol)).lower(), - 'last': data, - 'broker_ts': time.time(), - 'data': data.to_dict(), - 'receipt': receipt_timestamp - })) - - async def _l1(data: dict, receipt_timestamp): - to_trio.send_nowait( - ('l1', { - 'symbol': cb_sym_to_deribit_inst( - str_to_cb_sym(data.symbol)).lower(), + instrument_db = sym_fmt_piker_to_deribit(instrument) + + trades_chan = f'trades.{instrument_db}.raw' + book_chan = f'book.{instrument_db}.none.1.100ms' + + channels = [trades_chan, book_chan] + + send_chann, recv_chann = trio.open_memory_channel(0) + async def sub_hook(msg): + chan = msg.params['channel'] + data = msg.params['data'] + if chan == trades_chan: + await send_chann.send(( + 'trade', { + 'symbol': instrument, + 'last': data['price'], + 'brokerd_ts': time.time(), + 'ticks': [{ + 'type': 'trade', + 'price': data['price'], + 'size': data['amount'], + 'broker_ts': data['timestamp'] + }] + } + )) + + elif chan == book_chan: + bid, bsize = data['bids'][0] + ask, asize = data['asks'][0] + await send_chann.send(( + 'l1', { + 'symbol': instrument, 'ticks': [ + {'type': 'bid', 'price': bid, 'size': bsize}, + {'type': 'bsize', 'price': bid, 'size': bsize}, + {'type': 'ask', 'price': ask, 'size': asize}, + {'type': 'asize', 'price': ask, 'size': asize} + ]} + )) - {'type': 'bid', - 'price': float(data.bid_price), - 'size': float(data.bid_size)}, - - {'type': 'bsize', - 'price': float(data.bid_price), - 'size': float(data.bid_size)}, + async with open_cached_client('deribit') as client: - {'type': 'ask', - 'price': float(data.ask_price), - 'size': float(data.ask_size)}, - - {'type': 'asize', - 'price': float(data.ask_price), - 'size': float(data.ask_size)} - ] - }) - ) - - fh.add_feed( - DERIBIT, - channels=[TRADES, L1_BOOK], - symbols=[piker_sym_to_cb_sym(instrument)], - callbacks={ - TRADES: _trade, - L1_BOOK: _l1 + client.update_hooks({ + 'request': sub_hook + }) + client.update_types({ + 'request': JSONRPCSubRequest }) - if not fh.running: - fh.run( - start_loop=False, - install_signal_handlers=False) - - # sync with trio - to_trio.send_nowait(None) + resp = await client.json_rpc( + 'private/subscribe', {'channels': channels}) - await asyncio.sleep(float('inf')) + assert resp.result == channels + log.info(f'Subscribed to {channels}') -@acm -async def open_price_feed( - instrument: str -) -> trio.abc.ReceiveStream: - async with maybe_open_feed_handler() as fh: - async with to_asyncio.open_channel_from( - partial( - aio_price_feed_relay, - fh, - instrument - ) - ) as (first, chan): - yield chan + yield recv_chann @acm diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index da5211ced..84e04fa17 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -40,16 +40,9 @@ from .api import ( Client, Trade, - piker_sym_to_cb_sym, - cb_sym_to_deribit_inst, maybe_open_price_feed ) -_spawn_kwargs = { - 'infect_asyncio': True, -} - - log = get_logger(__name__) @@ -107,10 +100,7 @@ async def stream_quotes( sym = symbols[0] - async with ( - open_cached_client('deribit') as client, - send_chan as send_chan - ): + async with open_cached_client('deribit') as client: init_msgs = { # pass back token, and bool, signalling if we're the writer @@ -118,21 +108,18 @@ async def stream_quotes( sym: { 'symbol_info': { 'asset_type': 'option', - 'price_tick_size': 0.0005 + 'price_tick_size': 0.0005, + 'lot_tick_size': 0.1 }, 'shm_write_opts': {'sum_tick_vml': False}, 'fqsn': sym, }, } - nsym = piker_sym_to_cb_sym(sym) - async with maybe_open_price_feed(sym) as stream: - - await client.cache_symbols() + last_trades = (await client.last_trades(sym, count=1)).trades - last_trades = (await client.last_trades( - cb_sym_to_deribit_inst(nsym), count=1)).trades + async with maybe_open_price_feed(sym) as stream: if len(last_trades) == 0: last_trade = None diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 2dd7f4afe..41aab6ac5 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -187,7 +187,6 @@ async def open_autorecon_ws( ''' - class JSONRPCResult(Struct): id: int jsonrpc: str = '2.0' @@ -202,9 +201,32 @@ async def open_jsonrpc_session( response_type: type = JSONRPCResult, request_type: Optional[type] = None, request_hook: Optional[Callable] = None, - error_hook: Optional[Callable] = None, + error_hook: Optional[Callable] = None ) -> Callable[[str, dict], dict]: + # xor: this two params need to be passed together or not at all + if bool(request_type) ^ bool(request_hook): + raise ValueError( + 'Need to path both a request_type and request_hook') + + hook_table = { + 'request': request_hook, + 'error': error_hook + } + + types_table = { + 'response': response_type, + 'request': request_type + } + + def update_hooks(new_hooks: dict): + nonlocal hook_table + hook_table.update(new_hooks) + + def update_types(new_types: dict): + nonlocal types_table + types_table.update(new_types) + async with ( trio.open_nursery() as n, open_autorecon_ws(url) as ws @@ -257,8 +279,7 @@ async def recv_task(): 'result': _, 'id': mid, } if res_entry := rpc_results.get(mid): - - res_entry['result'] = response_type(**msg) + res_entry['result'] = types_table['response'](**msg) res_entry['event'].set() case { @@ -269,24 +290,32 @@ async def recv_task(): f'Unexpected ws msg: {json.dumps(msg, indent=4)}' ) + case { + 'error': error, + 'id': mid + } if res_entry := rpc_results.get(mid): + + res_entry['result'] = types_table['response'](**msg) + res_entry['event'].set() + case { 'method': _, 'params': _, }: - log.debug(f'Recieved\n{msg}') - if request_hook: - await request_hook(request_type(**msg)) + log.info(f'Recieved\n{msg}') + if hook_table['request']: + await hook_table['request'](types_table['request'](**msg)) case { - 'error': error + 'error': error, }: log.warning(f'Recieved\n{error}') - if error_hook: - await error_hook(response_type(**msg)) + if hook_table['error']: + await hook_table['error'](types_table['response'](**msg)) case _: log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') n.start_soon(recv_task) - yield json_rpc + yield json_rpc, update_hooks, update_types n.cancel_scope.cancel() From fef8073113eed0d536021af47d97eff10722cb9e Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Thu, 9 Mar 2023 13:46:19 -0300 Subject: [PATCH 09/11] Add new documented api get_book_summary_by_currency --- piker/brokers/deribit/api.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 59a49a588..321908c98 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -31,6 +31,7 @@ ) import pendulum +import asks import trio from trio_typing import TaskStatus from fuzzywuzzy import process as fuzzy @@ -357,6 +358,19 @@ async def last_trades( return LastTradesResult(**resp.result) + async def get_book_summary( + self, + currency: str, + kind: str = 'option' + ): + return await self.json_rpc( + 'public/get_book_summary_by_currency', + params={ + 'currency': currency, + 'kind': kind + }) + + class JSONRPCSubRequest(Struct): method: str From 77fbc7eb86401844b0a644835b13fec3051e9098 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Fri, 10 Mar 2023 13:25:40 -0300 Subject: [PATCH 10/11] Fruther generalize json_rpc hook mechanic to allow for multi hook, Add new maybe_open_ticker_feed to stream greeks, iv, open interest of an instrument --- piker/brokers/deribit/api.py | 146 +++++++++++++++++------------------ piker/data/_web_bs.py | 35 ++++++--- 2 files changed, 98 insertions(+), 83 deletions(-) diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 321908c98..628a37f8e 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -152,7 +152,7 @@ class Client: def __init__( self, json_rpc: Callable, - update_hooks: Callable, + append_hooks: Callable, update_types: Callable, ) -> None: @@ -169,7 +169,7 @@ def __init__( self._key_secret = None self.json_rpc = json_rpc - self.update_hooks = update_hooks + self.append_hooks = append_hooks self.update_types = update_types @property @@ -490,6 +490,7 @@ async def sub_hook(msg): }] } )) + return True elif chan == book_chan: bid, bsize = data['bids'][0] @@ -504,11 +505,14 @@ async def sub_hook(msg): {'type': 'asize', 'price': ask, 'size': asize} ]} )) + return True + + return False async with open_cached_client('deribit') as client: - client.update_hooks({ - 'request': sub_hook + client.append_hooks({ + 'request': [sub_hook] }) client.update_types({ 'request': JSONRPCSubRequest @@ -517,12 +521,15 @@ async def sub_hook(msg): resp = await client.json_rpc( 'private/subscribe', {'channels': channels}) - assert resp.result == channels + assert not resp.error log.info(f'Subscribed to {channels}') yield recv_chann + resp = await client.json_rpc('private/unsubscribe', {'channels': channels}) + + assert not resp.error @acm async def maybe_open_price_feed( @@ -543,71 +550,64 @@ async def maybe_open_price_feed( yield feed -# TODO: order broker support: this is all draft code from @guilledk B) - -# async def aio_order_feed_relay( -# fh: FeedHandler, -# instrument: Symbol, -# from_trio: asyncio.Queue, -# to_trio: trio.abc.SendChannel, - -# ) -> None: -# async def _fill(data: dict, receipt_timestamp): -# breakpoint() - -# async def _order_info(data: dict, receipt_timestamp): -# breakpoint() - -# fh.add_feed( -# DERIBIT, -# channels=[FILLS, ORDER_INFO], -# symbols=[instrument.upper()], -# callbacks={ -# FILLS: _fill, -# ORDER_INFO: _order_info, -# }) - -# if not fh.running: -# fh.run( -# start_loop=False, -# install_signal_handlers=False) - -# # sync with trio -# to_trio.send_nowait(None) - -# await asyncio.sleep(float('inf')) - - -# @acm -# async def open_order_feed( -# instrument: list[str] -# ) -> trio.abc.ReceiveStream: -# async with maybe_open_feed_handler() as fh: -# async with to_asyncio.open_channel_from( -# partial( -# aio_order_feed_relay, -# fh, -# instrument -# ) -# ) as (first, chan): -# yield chan - - -# @acm -# async def maybe_open_order_feed( -# instrument: str -# ) -> trio.abc.ReceiveStream: - -# # TODO: add a predicate to maybe_open_context -# async with maybe_open_context( -# acm_func=open_order_feed, -# kwargs={ -# 'instrument': instrument, -# 'fh': fh -# }, -# key=f'{instrument}-order', -# ) as (cache_hit, feed): -# if cache_hit: -# yield broadcast_receiver(feed, 10) -# else: -# yield feed +@acm +async def open_ticker_feed( + instrument: str +) -> trio.abc.ReceiveStream: + + instrument_db = sym_fmt_piker_to_deribit(instrument) + + ticker_chan = f'incremental_ticker.{instrument_db}' + + channels = [ticker_chan] + + send_chann, recv_chann = trio.open_memory_channel(0) + async def sub_hook(msg): + chann = msg.params['channel'] + if chann == ticker_chan: + data = msg.params['data'] + await send_chann.send(( + 'ticker', { + 'symbol': instrument, + 'data': data + } + )) + return True + + return False + + async with open_cached_client('deribit') as client: + + client.append_hooks({ + 'request': [sub_hook] + }) + + resp = await client.json_rpc( + 'private/subscribe', {'channels': channels}) + + assert not resp.error + + log.info(f'Subscribed to {channels}') + + yield recv_chann + + resp = await client.json_rpc('private/unsubscribe', {'channels': channels}) + + assert not resp.error + +@acm +async def maybe_open_ticker_feed( + instrument: str +) -> trio.abc.ReceiveStream: + + async with maybe_open_context( + acm_func=open_ticker_feed, + kwargs={ + 'instrument': instrument + }, + key=f'{instrument}-ticker', + ) as (cache_hit, feed): + if cache_hit: + yield broadcast_receiver(feed, 10) + else: + yield feed diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 41aab6ac5..2780a75a1 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -209,9 +209,17 @@ async def open_jsonrpc_session( raise ValueError( 'Need to path both a request_type and request_hook') + req_hooks = [] + if request_hook: + req_hooks.append(request_hook) + + err_hooks = [] + if error_hook: + err_hooks.append(error_hook) + hook_table = { - 'request': request_hook, - 'error': error_hook + 'request': req_hooks, + 'error': err_hooks } types_table = { @@ -219,9 +227,10 @@ async def open_jsonrpc_session( 'request': request_type } - def update_hooks(new_hooks: dict): + def append_hooks(new_hooks: dict): nonlocal hook_table - hook_table.update(new_hooks) + for htype, hooks in new_hooks.items(): + hook_table[htype] += hooks def update_types(new_types: dict): nonlocal types_table @@ -234,7 +243,7 @@ def update_types(new_types: dict): rpc_id: Iterable = count(start_id) rpc_results: dict[int, dict] = {} - async def json_rpc(method: str, params: dict) -> dict: + async def json_rpc(method: str, params: dict = {}) -> dict: ''' perform a json rpc call and wait for the result, raise exception in case of error field present on response @@ -303,19 +312,25 @@ async def recv_task(): 'params': _, }: log.info(f'Recieved\n{msg}') - if hook_table['request']: - await hook_table['request'](types_table['request'](**msg)) + if len(hook_table['request']) > 0: + for hook in hook_table['request']: + result = await hook(types_table['request'](**msg)) + if result: + break case { 'error': error, }: log.warning(f'Recieved\n{error}') - if hook_table['error']: - await hook_table['error'](types_table['response'](**msg)) + if len(hook_table['error']) > 0: + for hook in hook_table['error']: + result = await hook(types_table['response'](**msg)) + if result: + break case _: log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') n.start_soon(recv_task) - yield json_rpc, update_hooks, update_types + yield json_rpc, append_hooks, update_types n.cancel_scope.cancel() From 926ab1dfa623b85a097a9b0ebade82e00fc803f3 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Fri, 10 Mar 2023 17:09:18 -0300 Subject: [PATCH 11/11] Add stream ticker test --- piker/brokers/deribit/api.py | 31 ++++++++++++++---------- tests/test_deribit.py | 46 ++++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 12 deletions(-) create mode 100644 tests/test_deribit.py diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 628a37f8e..94d17ca08 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -154,19 +154,13 @@ def __init__( json_rpc: Callable, append_hooks: Callable, update_types: Callable, + key_id: str | None = None, + key_secret: str | None = None ) -> None: self._pairs: dict[str, Any] = None - - config = get_config().get('deribit', {}) - - if ('key_id' in config) and ('key_secret' in config): - self._key_id = config['key_id'] - self._key_secret = config['key_secret'] - - else: - self._key_id = None - self._key_secret = None + self._key_id = key_id + self._key_secret = key_secret self.json_rpc = json_rpc self.append_hooks = append_hooks @@ -383,14 +377,24 @@ async def get_client( is_brokercheck: bool = False ) -> Client: + config = get_config().get('deribit', {}) + + ws_url = config.get('ws_url', _ws_url) + key_id = config.get('key_id', None) + key_secret = config.get('key_secret', None) + async with ( trio.open_nursery() as n, open_jsonrpc_session( - _ws_url, + ws_url, response_type=JSONRPCResult ) as control_functions ): - client = Client(*control_functions) + client = Client( + *control_functions, + key_id=key_id, + key_secret=key_secret + ) _refresh_token: Optional[str] = None _access_token: Optional[str] = None @@ -581,6 +585,9 @@ async def sub_hook(msg): client.append_hooks({ 'request': [sub_hook] }) + client.update_types({ + 'request': JSONRPCSubRequest + }) resp = await client.json_rpc( 'private/subscribe', {'channels': channels}) diff --git a/tests/test_deribit.py b/tests/test_deribit.py new file mode 100644 index 000000000..ac37534f0 --- /dev/null +++ b/tests/test_deribit.py @@ -0,0 +1,46 @@ +import trio +import pytest +import tractor + +from piker import config + +from piker.brokers.deribit import api as deribit +from piker.brokers.deribit.api import _testnet_ws_url + +from piker._cacheables import open_cached_client + + +TESTNET_KEY_ID: str | None = None +TESTNET_KEY_SECRET: str | None = None + +@pytest.mark.skipif( + not TESTNET_KEY_ID or not TESTNET_KEY_SECRET, + reason='configure a deribit testnet key pair before running this test' +) +def test_deribit_get_ticker(open_test_pikerd): + + async def _test_main(): + async with open_test_pikerd() as _: + async with open_cached_client('deribit') as client: + + symbols = await client.symbol_info() + + syms = list(symbols.keys()) + sym = syms[int(len(syms) / 2)] + + async with deribit.maybe_open_ticker_feed(sym) as tick_stream: + async for typ, msg in tick_stream: + assert typ == 'ticker' + assert 'open_interest' in msg['data'] + break + + + + config.write({ + 'deribit': { + 'ws_url': _testnet_ws_url, + 'key_id': TESTNET_KEY_ID, + 'key_secret': TESTNET_KEY_SECRET + } + }) + trio.run(_test_main)