From 25c2720d5c2b5ff4bb0c7d1dad280f79dccc802c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 29 Jan 2023 15:17:39 -0500 Subject: [PATCH] First draft storage layer cli Adds a `piker storage` subcmd with a `-d` flag to wipe a particular fqsn's time series (both 1s and 60s). Obviously this needs to be extended much more but provides a start point. --- piker/data/cli.py | 73 ++++++++++++++++++++++++++++++++++----- piker/data/marketstore.py | 23 +++++------- 2 files changed, 73 insertions(+), 23 deletions(-) diff --git a/piker/data/cli.py b/piker/data/cli.py index 554048a46..994b9da43 100644 --- a/piker/data/cli.py +++ b/piker/data/cli.py @@ -19,7 +19,10 @@ """ from functools import partial -from pprint import pformat +from pprint import ( + pformat, + pprint, +) from anyio_marketstore import open_marketstore_client import trio @@ -113,15 +116,11 @@ async def main(): @cli.command() @click.option( - '--tl', - is_flag=True, - help='Enable tractor logging') -@click.option( - '--host', + '--tsdb_host', default='localhost' ) @click.option( - '--port', + '--tsdb_port', default=5993 ) @click.argument('symbols', nargs=-1) @@ -137,7 +136,7 @@ def storesh( Start an IPython shell ready to query the local marketstore db. ''' - from piker.data.marketstore import tsdb_history_update + from piker.data.marketstore import open_tsdb_client from piker._daemon import open_piker_runtime async def main(): @@ -148,7 +147,63 @@ async def main(): enable_modules=['piker.data._ahab'], ): symbol = symbols[0] - await tsdb_history_update(symbol) + + async with open_tsdb_client(symbol) as storage: + # TODO: ask if user wants to write history for detected + # available shm buffers? + from tractor.trionics import ipython_embed + await ipython_embed() + + trio.run(main) + + +@cli.command() +@click.option( + '--host', + default='localhost' +) +@click.option( + '--port', + default=5993 +) +@click.option( + '--delete', + '-d', + is_flag=True, + help='Delete history (1 Min) for symbol(s)', +) +@click.argument('symbols', nargs=-1) +@click.pass_obj +def storage( + config, + host, + port, + symbols: list[str], + delete: bool, + +): + ''' + Start an IPython shell ready to query the local marketstore db. + + ''' + from piker.data.marketstore import open_tsdb_client + from piker._daemon import open_piker_runtime + + async def main(): + nonlocal symbols + + async with open_piker_runtime( + 'tsdb_storage', + enable_modules=['piker.data._ahab'], + ): + symbol = symbols[0] + async with open_tsdb_client(symbol) as storage: + if delete: + for fqsn in symbols: + syms = await storage.client.list_symbols() + breakpoint() + await storage.delete_ts(fqsn, 60) + await storage.delete_ts(fqsn, 1) trio.run(main) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index d354f9b02..1403059c5 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -498,7 +498,6 @@ async def delete_ts( client = self.client syms = await client.list_symbols() - print(syms) if key not in syms: raise KeyError(f'`{key}` table key not found in\n{syms}?') @@ -615,10 +614,10 @@ async def open_storage_client( yield Storage(client) -async def tsdb_history_update( - fqsn: Optional[str] = None, - -) -> list[str]: +@acm +async def open_tsdb_client( + fqsn: str, +) -> Storage: # TODO: real-time dedicated task for ensuring # history consistency between the tsdb, shm and real-time feed.. @@ -647,7 +646,7 @@ async def tsdb_history_update( # - https://github.com/pikers/piker/issues/98 # profiler = Profiler( - disabled=False, # not pg_profile_enabled(), + disabled=True, # not pg_profile_enabled(), delayed=False, ) @@ -688,14 +687,10 @@ async def tsdb_history_update( # profiler('Finished db arrays diffs') - syms = await storage.client.list_symbols() - log.info(f'Existing tsdb symbol set:\n{pformat(syms)}') - profiler(f'listed symbols {syms}') - - # TODO: ask if user wants to write history for detected - # available shm buffers? - from tractor.trionics import ipython_embed - await ipython_embed() + syms = await storage.client.list_symbols() + # log.info(f'Existing tsdb symbol set:\n{pformat(syms)}') + # profiler(f'listed symbols {syms}') + yield storage # for array in [to_append, to_prepend]: # if array is None: