Skip to content

Commit db80b49

Browse files
committed
Implement plugins infrastructure
Plugins are python classes supposed to serve as workers to perform any additional operations which are not performed from strategies. Example use-cases: * Download trading history * Analyze trading history * Check for updates * Report dexbot statistic Current implementation uses separate thread and asyncio event loop inside. This is a temporary solution before refactoring WorkerInfrastructure to asyncio.
1 parent 85fce2f commit db80b49

File tree

5 files changed

+128
-0
lines changed

5 files changed

+128
-0
lines changed

dexbot/helper.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import shutil
44
import errno
55
import logging
6+
import pkgutil
67
from appdirs import user_data_dir
78

89
from dexbot import APP_NAME, AUTHOR
@@ -77,6 +78,15 @@ def initialize_orders_log():
7778
logger.info("worker_name;ID;operation_type;base_asset;base_amount;quote_asset;quote_amount;timestamp")
7879

7980

81+
def iter_namespace(ns_pkg):
82+
# https://packaging.python.org/guides/creating-and-discovering-plugins/
83+
# Specifying the second argument (prefix) to iter_modules makes the
84+
# returned name an absolute name instead of a relative one. This allows
85+
# import_module to work without having to do additional modification to
86+
# the name.
87+
return pkgutil.iter_modules(ns_pkg.__path__, ns_pkg.__name__ + ".")
88+
89+
8090
try:
8191
# Unfortunately setuptools is only "kinda-sorta" a standard module
8292
# it's available on pretty much any modern Python system, but some embedded Pythons may not have it

dexbot/plugin.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import asyncio
2+
import threading
3+
import importlib
4+
import logging
5+
6+
import dexbot.plugins
7+
from dexbot.helper import iter_namespace
8+
9+
from bitshares import BitShares
10+
11+
log = logging.getLogger(__name__)
12+
13+
class PluginInfrastructure(threading.Thread):
14+
""" Run plugins as asyncio tasks
15+
16+
:param dict config: dexbot config
17+
18+
PluginInfrastructure class is needed to be able to run asyncio plugins while having synchronous core. After
19+
switching to asyncio-aware main thread we may continue to use all plugins without refactoring them.
20+
"""
21+
22+
def __init__(self, config):
23+
super().__init__()
24+
25+
self.bitshares = BitShares(node=config['node'], num_retries=-1)
26+
self.config = config
27+
self.loop = None
28+
self.need_stop = False
29+
self.plugins = []
30+
31+
def run(self):
32+
log.debug('Starting PluginInfrastructure thread')
33+
self.init_plugins()
34+
self.loop = asyncio.new_event_loop()
35+
asyncio.set_event_loop(self.loop)
36+
self.loop.create_task(self.run_plugins())
37+
self.loop.create_task(self.stop_handler())
38+
self.loop.run_forever()
39+
40+
def init_plugins(self):
41+
""" Initialize plugin instances
42+
"""
43+
plugins = {name: importlib.import_module(name) for finder, name, ispkg in iter_namespace(dexbot.plugins)}
44+
45+
for name, plugin in plugins.items():
46+
self.plugins.append(plugin.Plugin(config=self.config, bitshares_instance=self.bitshares))
47+
48+
async def run_plugins(self):
49+
""" Run each discovered plugin by calling Plugin.main()
50+
"""
51+
# Schedule every plugin as asyncio Task; use ensure_future() for python3.6 compatibility
52+
tasks = [asyncio.ensure_future(plugin.main()) for plugin in self.plugins]
53+
try:
54+
# Wait until all plugins are finished, but catch exceptions immediately as they occure
55+
await asyncio.gather(*tasks, return_exceptions=False)
56+
except asyncio.CancelledError:
57+
# Note: task.cancel() will not propagate this exception here, so it will appear only on current task cancel
58+
log.debug('Stopping run_plugins()')
59+
except Exception:
60+
log.exception('Task finished with exception:')
61+
62+
async def stop_handler(self):
63+
""" Watch for self.need_stop flag to cancel tasks and stop the thread
64+
65+
With this solution it's easier to achieve correct tasks stopping. self.loop.call_soon_threadsafe() requires
66+
additional wrapping to stop tasks or catch exceptions.
67+
"""
68+
while True:
69+
if self.need_stop:
70+
log.debug('Stopping event loop')
71+
tasks = [task for task in asyncio.Task.all_tasks() if task is not asyncio.tasks.Task.current_task()]
72+
# Cancel all tasks
73+
list(map(lambda task: task.cancel(), tasks))
74+
# Wait for tasks finish
75+
results = await asyncio.gather(*tasks, return_exceptions=True)
76+
log.debug('Finished awaiting cancelled tasks, results: {0}'.format(results))
77+
# Stop the event loop
78+
self.loop.stop()
79+
return
80+
else:
81+
await asyncio.sleep(1)

dexbot/plugins/__init__.py

Whitespace-only changes.

dexbot/plugins/dummy.py.example

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import asyncio
2+
import logging
3+
4+
log = logging.getLogger(__name__)
5+
6+
7+
class Plugin:
8+
""" Example plugin class
9+
10+
Plugin must have main() method to run. main() is expected to be an asyncio coroutine
11+
"""
12+
13+
def __init__(self, config=None, bitshares_instance=None):
14+
pass
15+
16+
async def do_stuff(self):
17+
log.info('Doing some stuff')
18+
await asyncio.sleep(10)
19+
log.info('Stuff done')
20+
21+
async def boom(self):
22+
raise Exception('Boom!')
23+
24+
async def main(self):
25+
try:
26+
while True:
27+
await self.do_stuff()
28+
await asyncio.sleep(5)
29+
await self.boom()
30+
except asyncio.CancelledError:
31+
log.info('Stopping correctly')

dexbot/worker.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import copy
77

88
import dexbot.errors as errors
9+
from dexbot.plugin import PluginInfrastructure
910
from dexbot.strategies.base import StrategyBase
1011

1112
from bitshares import BitShares
@@ -173,6 +174,8 @@ def add_worker(self, worker_name, config):
173174
self.update_notify()
174175

175176
def run(self):
177+
self.plugins_thread = PluginInfrastructure(self.config)
178+
self.plugins_thread.start()
176179
self.init_workers(self.config)
177180
self.update_notify()
178181
self.notify.listen()
@@ -206,6 +209,9 @@ def stop(self, worker_name=None, pause=False):
206209
self.workers[worker].pause()
207210
self.workers = []
208211

212+
# Notify plugins to stop
213+
self.plugins_thread.need_stop = True
214+
209215
# Update other workers
210216
if len(self.workers) > 0:
211217
self.update_notify()

0 commit comments

Comments
 (0)