Skip to content

Commit 832949a

Browse files
committed
Refactor slave decoding loop.
1 parent fb5687a commit 832949a

File tree

1 file changed

+40
-42
lines changed

1 file changed

+40
-42
lines changed

kyototycoon/kyotoslave.py

Lines changed: 40 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,42 @@
1919
OP_REMOVE = 0xa2
2020
OP_CLEAR = 0xa5
2121

22+
def _read_varnum(data):
23+
value = 0
24+
25+
for i, byte in enumerate(data):
26+
value = (value << 7) + (byte & 0x7f)
27+
28+
if byte < 0x80:
29+
return (value, data[i+1:])
30+
31+
return (0, data)
32+
33+
def _decode_log_entry(entry_data):
34+
sid, db, db_op = struct.unpack('!HHB', entry_data[:5])
35+
entry = {'sid': sid, 'db': db, 'op': db_op}
36+
37+
if db_op == OP_CLEAR:
38+
return entry
39+
40+
key_size, buf = _read_varnum(bytearray(entry_data[5:]))
41+
42+
if db_op == OP_REMOVE:
43+
entry['key'] = bytes(buf[:key_size])
44+
45+
return entry
46+
47+
if db_op == OP_SET:
48+
value_size, buf = _read_varnum(buf)
49+
50+
entry['key'] = bytes(buf[:key_size])
51+
entry['expires'], = struct.unpack('!Q', b'\x00\x00\x00' + bytes(buf[key_size:key_size+5]))
52+
entry['value'] = bytes(buf[key_size+5:key_size+value_size])
53+
54+
return entry
55+
56+
raise KyotoTycoonException('unsupported database operation [%s]' % hex(db_op))
57+
2258
class KyotoSlave(object):
2359
def __init__(self, sid, host='127.0.0.1', port=1978, timeout=30):
2460
'''Initialize a Kyoto Tycoon replication slave with ID "sid" to the specified master.'''
@@ -54,57 +90,19 @@ def consume(self, timestamp=None):
5490
if magic != MB_REPL:
5591
raise KyotoTycoonException('bad response [%s]' % hex(magic))
5692

57-
# Common log entry information...
58-
size, = struct.unpack('!I', self._read(4))
59-
sid, db, db_op = struct.unpack('!HHB', self._read(5))
60-
61-
entry = {'sid': sid, 'db': db, 'op': db_op}
93+
log_size, = struct.unpack('!I', self._read(4))
94+
entry = _decode_log_entry(self._read(log_size))
6295

63-
buf = bytearray(self._read(size - 5))
64-
65-
if sid == self.sid: # ...this must never happen!
96+
if entry['sid'] == self.sid: # ...this must never happen!
6697
raise KyotoTycoonException('bad log entry [sid=%d]' % sid)
6798

68-
if db_op == OP_CLEAR:
69-
yield entry
70-
continue
71-
72-
if db_op == OP_REMOVE:
73-
key_size, buf = self._read_varnum(buf)
74-
entry['key'] = bytes(buf[:key_size])
75-
76-
yield entry
77-
continue
78-
79-
if db_op == OP_SET:
80-
key_size, buf = self._read_varnum(buf)
81-
value_size, buf = self._read_varnum(buf)
82-
83-
entry['key'] = bytes(buf[:key_size])
84-
entry['expires'], = struct.unpack('!Q', b'\x00\x00\x00' + bytes(buf[key_size:key_size+5]))
85-
entry['value'] = bytes(buf[key_size+5:key_size+value_size])
86-
87-
yield entry
88-
continue
89-
90-
raise KyotoTycoonException('unsupported database operation [%s]' % hex(db_op))
99+
yield entry
91100

92101
def close(self):
93102
self.socket.shutdown(socket.SHUT_RDWR)
94103
self.socket.close()
95104
return True
96105

97-
def _read_varnum(self, data):
98-
value = 0
99-
100-
for i, byte in enumerate(data):
101-
value = (value << 7) + (byte & 0x7f)
102-
103-
if byte < 0x80:
104-
return (value, data[i+1:])
105-
106-
return (0, data)
107-
108106
def _write(self, data):
109107
self.socket.sendall(data)
110108

0 commit comments

Comments
 (0)