diff --git a/piker/_daemon.py b/piker/_daemon.py index 8983ecccf..c7ee09d53 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -327,6 +327,7 @@ async def open_pikerd( # db init flags tsdb: bool = False, es: bool = False, + mpd: bool = False, ) -> Services: ''' @@ -396,6 +397,14 @@ async def open_pikerd( f'config: {pformat(config)}' ) + if mpd: + from piker.data._max_pain_daemon import start_max_pain_daemon + + start_max_pain_daemon() + + log.info('Collecting data from deribit...') + + # assign globally for future daemon/task creation Services.actor_n = actor_nursery Services.service_n = service_nursery diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 9b6f225ce..4c0bdaa74 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -54,6 +54,11 @@ is_flag=True, help='Enable local ``elasticsearch`` instance' ) +@click.option( + '--mpd', + is_flag=True, + help='Read from deribit and dump data to elastic db' +) def pikerd( loglevel: str, host: str, @@ -62,6 +67,7 @@ def pikerd( pdb: bool, tsdb: bool, es: bool, + mpd: bool, ): ''' Spawn the piker broker-daemon. @@ -92,6 +98,7 @@ async def main(): open_pikerd( tsdb=tsdb, es=es, + mpd=mpd, loglevel=loglevel, debug_mode=pdb, registry_addr=reg_addr, diff --git a/piker/data/_max_pain_daemon.py b/piker/data/_max_pain_daemon.py new file mode 100644 index 000000000..a36b64a58 --- /dev/null +++ b/piker/data/_max_pain_daemon.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python +import time +import json +from datetime import datetime, timedelta +import requests + +from cryptofeed import FeedHandler +from cryptofeed.callback import OrderInfoCallback, BalancesCallback, UserFillsCallback +from cryptofeed.exchanges.deribit import Deribit +from cryptofeed.defines import DERIBIT, TRADES, OPEN_INTEREST, OPTION, CALL, PUT +from cryptofeed.symbols import Symbol + +from elasticsearch import Elasticsearch + +from .elastic import ( + ES_HOST, + oi_mapping, + trades_mapping, + es_prefix +) + +def start_max_pain_daemon(): + + config = \ + { + 'log':{ + 'filename': 'feedhandler.log', + 'level': 'INFO' + }, + 'deribit': { + 'key_id': 'KPJN6bFQ', + 'key_secret': 'TMPatQRXMQUQ83OCVoqYomcMvPYpTUPOUqayuJJ3zMA' + } + } + + # maybe import this function from ṕiker.brokers.deribit.api + def piker_sym_to_cb_sym(name: str) -> Symbol: + base, expiry_date, strike_price, option_type = tuple( + name.upper().split('-')) + quote = base + + if option_type == 'P': + option_type = PUT + elif option_type == 'C': + option_type = CALL + else: + raise Exception("Couldn\'t parse option type") + + return Symbol( + base, + quote, + type=OPTION, + strike_price=strike_price, + option_type=option_type, + expiry_date=expiry_date.upper() + ) + + def get_instruments(currency, kind): + payload = {'currency': currency, 'kind': kind} + r = requests.get('https://test.deribit.com/api/v2/public/get_instruments', params=payload) + resp = json.loads(r.text) + response_list = [] + + # For now only get half of the instruments + for i in range(len(resp['result']) // 2): + element = resp['result'][i] + response_list.append(piker_sym_to_cb_sym(element['instrument_name'])) + + return response_list + + # trade and oi are user defined functions that + # will be called when trade and open interest updates are received + # data type is not dict, is an object: cryptofeed.types.OpenINterest + async def oi(data: dict, receipt_timestamp): + + # Get timestamp and convert it to isoformat + date = (datetime.utcfromtimestamp(data.timestamp)).isoformat() + + index = es_prefix(data.symbol, 'oi') + document = { + 'timestamp': date, + 'open_interest': data.open_interest + } + #Save to db + es.index(index=index, document=document) + print('Saving to db...') + print(date) + print(data) + + # Data type is not dict, is an object: cryptofeed.types.Ticker + async def trade(data: dict, receipt_timestamp): + # Get timestamp and convert it to isoformat + date = (datetime.utcfromtimestamp(data.timestamp)).isoformat() + + index = es_prefix(data.symbol, 'trades') + document = { + 'direction': data.side, + 'amount': data.amount, + 'price': data.price, + 'timestamp': date, + } + #Save to db + es.index(index=index, document=document) + print('Saving to db...') + print(date) + print(data) + + callbacks = {TRADES: trade, OPEN_INTEREST: oi} + + fh = FeedHandler(config=config) + + fh.add_feed( + DERIBIT, + channels=[TRADES, OPEN_INTEREST], + symbols=get_instruments('BTC', 'option'), + callbacks=callbacks + ) + + es = Elasticsearch(ES_HOST) + + es.indices.put_template( + name='oi_mapping', + body=oi_mapping) + + es.indices.put_template( + name='trades_mapping', + body=trades_mapping) + + fh.run() + +# if __name__ == '__main__': +# main() diff --git a/piker/data/elastic.py b/piker/data/elastic.py index 43c6afd08..15ae95f45 100644 --- a/piker/data/elastic.py +++ b/piker/data/elastic.py @@ -37,6 +37,7 @@ ) import asks +import math log = get_logger(__name__) @@ -107,3 +108,71 @@ async def stop_matcher(msg: str): start_matcher, stop_matcher, ) + + + +ES_HOST = 'http://localhost:19200' + +shards = 2; +replicas = 0; +refresh = '1s'; + +compression = 'best_compression'; + +default_idx_settings = { + 'index': { + 'number_of_shards': shards, + 'refresh_interval': refresh, + 'number_of_replicas': replicas, + 'codec': compression + } +}; + +oi_mapping = { + 'order': 0, + 'index_patterns': [ + '*-oi' + ], + 'settings': default_idx_settings, + 'mappings': { + 'properties': { + 'timestamp': {'type': 'date'}, + 'open_interest': {'type': 'double'} + } + } +}; + +trades_mapping = { + 'order': 0, + 'index_patterns': [ + '*-trades' + ], + 'settings': default_idx_settings, + 'mappings': { + 'properties': { + 'direction': {'type': 'keyword'}, + 'amount': {'type': 'double'}, + 'price': {'type': 'double'}, + 'timestamp': {'type': 'date'} + } + } +}; + +max_pain_mapping = { + 'order': 0, + 'index_patterns': [ + '*-max-pain' + ], + 'settings': default_idx_settings, + 'mappings': { + 'properties': { + 'max_pain': {'type': 'double'}, + 'dollar_value': {'type': 'double'}, + 'timestamp': {'type': 'date'} + } + } +} + + +def es_prefix(instrument_name, kind): + return f'{instrument_name.lower()}-{kind}' \ No newline at end of file diff --git a/setup.py b/setup.py index 2a686cc55..e3850a2b9 100755 --- a/setup.py +++ b/setup.py @@ -87,7 +87,8 @@ ], 'es': [ 'docker', - 'elasticsearch' + 'elasticsearch', + 'cryptofeed', ] }, tests_require=['pytest'],