Skip to content
This repository was archived by the owner on Nov 18, 2024. It is now read-only.

Commit 7f9cd5c

Browse files
committed
- Added STREAM_DEAD message, which means the stream needs to be restarted by calling connect_stream().
- Added a reference to the thread that receives the stream that gets returned when you call connect_stream() - You can now access the ThetaClient object that you called connect_stream() on inside the callback method by using msg.client - Bug Fixes
1 parent 422937e commit 7f9cd5c

File tree

3 files changed

+43
-34
lines changed

3 files changed

+43
-34
lines changed

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "thetadata"
7-
version = "0.8.8"
7+
version = "0.8.9"
88
authors = [
99
{ name="Bailey Danseglio", email="[email protected]" },
1010
{ name="Adler Weber", email="[email protected]" },

thetadata/client.py

+41-33
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from .terminal import check_download, launch_terminal
2424

2525
_NOT_CONNECTED_MSG = "You must establish a connection first."
26-
_VERSION = '0.8.8'
26+
_VERSION = '0.8.9'
2727

2828

2929
def _format_strike(strike: float) -> int:
@@ -224,6 +224,7 @@ def to_string(self) -> str:
224224
class StreamMsg:
225225
"""Stream Msg"""
226226
def __init__(self):
227+
self.client = None
227228
self.type = StreamMsgType.ERROR
228229
self.req_response = None
229230
self.req_response_id = None
@@ -313,14 +314,15 @@ def connect(self):
313314
finally:
314315
self._server.close()
315316

316-
def connect_stream(self, callback):
317+
def connect_stream(self, callback) -> Thread:
317318
"""Initiate a connection with the Theta Terminal Stream server.
318319
Requests can only be made inside this generator aka the `with client.connect_stream()` block.
319320
Responses to the provided callback method are recycled, meaning that if you send data received
320321
in the callback method to another thread, you must create a copy of it first.
321322
322323
:raises ConnectionRefusedError: If the connection failed.
323324
:raises TimeoutError: If the timeout is set and has been reached.
325+
:return: The thread that is responsible for receiving messages.
324326
"""
325327
for i in range(15):
326328
try:
@@ -335,7 +337,9 @@ def connect_stream(self, callback):
335337
sleep(1)
336338
self._stream_server.settimeout(self.timeout)
337339
self._stream_impl = callback
338-
Thread(target=self._recv_stream).start()
340+
out = Thread(target=self._recv_stream)
341+
out.start()
342+
return out
339343

340344
def close_stream(self):
341345
self._stream_server.close()
@@ -481,39 +485,43 @@ def _recv_stream(self):
481485
"""from_bytes
482486
"""
483487
msg = StreamMsg()
484-
488+
msg.client = self
485489
parse_int = lambda d: int.from_bytes(d, "big")
486490

487491
while True:
488-
msg.type = StreamMsgType.from_code(parse_int(self._read_stream(1)[:1]))
489-
msg.contract.from_bytes(self._read_stream(parse_int(self._read_stream(1)[:1])))
490-
491-
if msg.type == StreamMsgType.QUOTE:
492-
msg.quote.from_bytes(self._read_stream(44))
493-
elif msg.type == StreamMsgType.TRADE:
494-
data = self._read_stream(n_bytes=32)
495-
msg.trade.from_bytes(data)
496-
elif msg.type == StreamMsgType.OHLCVC:
497-
data = self._read_stream(n_bytes=36)
498-
msg.ohlcvc.from_bytes(data)
499-
elif msg.type == StreamMsgType.PING:
500-
self._read_stream(n_bytes=4)
501-
continue
502-
elif msg.type == StreamMsgType.OPEN_INTEREST:
503-
data = self._read_stream(n_bytes=8)
504-
msg.open_interest.from_bytes(data)
505-
elif msg.type == StreamMsgType.REQ_RESPONSE:
506-
msg.req_response_id = parse_int(self._read_stream(4))
507-
msg.req_response = StreamResponseType.from_code(parse_int(self._read_stream(4)))
508-
self._stream_responses[msg.req_response_id] = msg.req_response
509-
elif msg.type == StreamMsgType.STOP or msg.type == StreamMsgType.START:
510-
msg.date = datetime.strptime(str(parse_int(self._read_stream(4))), "%Y%m%d").date()
511-
elif msg.type == StreamMsgType.DISCONNECTED or msg.type == StreamMsgType.RECONNECTED:
512-
self._read_stream(4) # Future use.
513-
else:
514-
raise ValueError('undefined msg type: ' + str(msg.type))
515-
516-
self._stream_impl(msg)
492+
try:
493+
msg.type = StreamMsgType.from_code(parse_int(self._read_stream(1)[:1]))
494+
msg.contract.from_bytes(self._read_stream(parse_int(self._read_stream(1)[:1])))
495+
if msg.type == StreamMsgType.QUOTE:
496+
msg.quote.from_bytes(self._read_stream(44))
497+
elif msg.type == StreamMsgType.TRADE:
498+
data = self._read_stream(n_bytes=32)
499+
msg.trade.from_bytes(data)
500+
elif msg.type == StreamMsgType.OHLCVC:
501+
data = self._read_stream(n_bytes=36)
502+
msg.ohlcvc.from_bytes(data)
503+
elif msg.type == StreamMsgType.PING:
504+
self._read_stream(n_bytes=4)
505+
continue
506+
elif msg.type == StreamMsgType.OPEN_INTEREST:
507+
data = self._read_stream(n_bytes=8)
508+
msg.open_interest.from_bytes(data)
509+
elif msg.type == StreamMsgType.REQ_RESPONSE:
510+
msg.req_response_id = parse_int(self._read_stream(4))
511+
msg.req_response = StreamResponseType.from_code(parse_int(self._read_stream(4)))
512+
self._stream_responses[msg.req_response_id] = msg.req_response
513+
elif msg.type == StreamMsgType.STOP or msg.type == StreamMsgType.START:
514+
msg.date = datetime.strptime(str(parse_int(self._read_stream(4))), "%Y%m%d").date()
515+
elif msg.type == StreamMsgType.DISCONNECTED or msg.type == StreamMsgType.RECONNECTED:
516+
self._read_stream(4) # Future use.
517+
else:
518+
raise ValueError('undefined msg type: ' + str(msg.type))
519+
520+
self._stream_impl(msg)
521+
except ConnectionResetError:
522+
msg.type = StreamMsgType.STREAM_DEAD
523+
self._stream_impl(msg)
524+
return
517525

518526
def _read_stream(self, n_bytes: int) -> bytearray:
519527
"""from_bytes

thetadata/enums.py

+1
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,7 @@ class StreamMsgType(enum.Enum):
365365
ERROR = 11
366366
DISCONNECTED = 12
367367
RECONNECTED = 13
368+
STREAM_DEAD = 14
368369

369370
# Client data
370371
CONTRACT = 20

0 commit comments

Comments
 (0)