-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathHashArmonicaUtils.py
354 lines (305 loc) · 15.4 KB
/
HashArmonicaUtils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
import socket
import json
import sys
import select
import datetime
class TryAgainError(Exception): pass
class SelfDestructError(Exception): pass
def now(): return datetime.datetime.now().time()
def rest(iterable):
to_return = iter(iterable)
next(to_return)
return to_return
''' HashArmonicaNetworkUtils provides common, opinionated networking utilities that both the client and server RPC stubs utilize.
Here, we encapsulate our networking protocol, by which we mean how a message should be formatted over a TCP stream, and not the exact format of the message.
We define a message to be a Python JSON parsable object.
But again, these networking utilities merely provide convenience functions to send and receive such messages, but place no restrictions on the contents of the message itself (is it a dictionary? an array? what should the message shape be?).
We use a communication protocol we call L3 that describes a packet format, i.e., the relationship between bytes sent across the network (via a TCP connection, it is imagined) and the message communicated.
The format is as follows:
length of length (1 byte) | length (length of length bytes) | message (length bytes)
We call it 'L3' because every message is prefixed by its length and its length's length: the format is [L]ength of [L]ength, [L]ength, message.
This format is neat because it allows for very long messages with very little overhead.
L3 can support messages up to 2^255 - 1 bytes (length of length may be up to 255, meaning length may be up to 2^255 - 1), which seems orders of magnitude greater than anything we'd need (we read that the total amount of data is the word is a few zeta - (2^70) - bytes).
The only price we pay for this flexibility, however, is a single extra byte in every packet.
That is, however many bytes it would take to send a message prepended by its length, it takes only one more to send a packet adhering to this app's communication protocol.
This constrasts a protocol that includes the message length in a constant number of bytes at the beginning of a packet: this constant number will need to be large to accomodate large messages, but this means that small messages waste most of this space.
The L3 protocol gives us the ability both to support large messages but keep small ones compact.
'''
# Custom Exceptions to our communication protocol
class RequestFormatError(RuntimeError):
''' Indicates a request was ill-formatted (not JSON, or wrong JSON shape) '''
def __init__(self, msg=None, obj=None):
super().__init__()
if msg:
self.args = (msg,)
elif obj:
self.args = (self._expected_format(obj),)
def _expected_format(self, obj):
doc = extract_doc(obj)
return 'Expected the request to have one of the following forms:\n' + '\n'.join(map(str, (
{
'method': method,
'arguments': {
arg: 'XXXXX'
for arg in doc[method]
}
}
for method in doc
))).replace("'XXXXX'", "XXXXX")
# Utility reflection functions
def params_of_fxn(fxn):
return params_of_code(fxn.__code__)
def params_of_code(code):
# A code's arguments are its first 'argcount' non-self local vars.
return [ param for param in code.co_varnames[:code.co_argcount] if param != 'self' ]
def execute_operation(oper, obj):
# May raise Attribute error if method doesn't exist
# May also raise TypeError if wrong # args, or any number of errors from fxn
return getattr(obj, oper['method'])(*oper['arguments'])
def extract_doc(obj):
return {
method_name: params_of_fxn(method)
# Methods "hidden" with _ are ignored.
for method_name in dir(obj) if callable(method := getattr(obj, method_name)) and not method_name.startswith("_")
}
def myip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
myip = s.getsockname()[0]
s.close()
return myip
def project_eq(x):
return lambda itr: next(filter(lambda y: x == y.get('project'), itr))
# a la https://stackoverflow.com/a/22498708
from threading import Thread, Event
def repeat(fxn, every):
stopped = Event()
def do():
while not stopped.wait(every):
fxn()
Thread(target=do).start()
return stopped.set
# Utility networking functions
''' Fundamental packet read/writes are implemented as file read/writes.
Files were decided to be the least common denominator because most
resources have the ability to be treated as a file.
The network level send/recv's, then, wrap these fundamental operations
by creating a file abstraction from the socket and, upon failure,
raise connection oriented errors.
A5: now that I use timeouts on my sockets, I should not use makefile on a socket
according to the documentation, as it may results in an inconsistent internal buffer.
'''
# nl (newline) protocol
BUFSIZ = 1024
def nl_messages(get_fxn):
''' "Gets" and returns a newline delimited message from a function.
'from_fxn' should be a function accepting one integer argument ('size')
and returning a byte-string up to length 'size'.
'''
# Use generator to maintain state between calls in case we receive too many bytes.
# E.g.: many messages waiting to be received, many fitting within 1024 bytes.
#print("generator init")
extra = b''
while piece := get_fxn():
extra += piece
while b'\n' in extra:
msg, extra = extra.split(b'\n', maxsplit=1)
yield msg
def nl_file_messages(from_file):
''' Reads a newline delimited message from a file. '''
yield from nl_messages(lambda: from_file.read(BUFSIZ))
def nl_socket_messages(from_skt):
''' Receives a newline delimited message from a socket. '''
yield from nl_messages(lambda: from_skt.recv(BUFSIZ))
#print(type(rlt))
#return rlt
def put_item(put_fxn, item):
''' "Puts" a byte string 'item' by passing it to 'put_fxn'. '''
# Bit roundabout since there's no extra logic here as in get_item.
# However, can easily facilitate adding extra logic to ALL fxns that
# put (send/write) items in one place.
#print(item)
return put_fxn(item)
def write_item_2(to_file, item):
''' Writes byte string 'item' to a file opened for binary writing. '''
return put_item(lambda item: to_file.write(item), item)
def send_item_2(to_skt, item):
''' Sends byte string 'item' to a socket. '''
return put_item(lambda item: to_skt.sendall(item), item)
def build_nl_packet(raw_pyld):
''' Builds an nl protocl packet containing the payload 'raw_pyld'. '''
return raw_pyld + b'\n'
def send_nl_message(to_skt, msg):
send_message(to_skt, msg, build_nl_packet)
def send_message(to_skt, msg, packet_builder):
send_item_2(to_skt, packet_builder(msg))
# L3 (length of length, length) Protocol
def read_L3_message(from_file):
''' Read an L3 communication packet from file and return its binary message.
The file should be opened in binary mode with read permission.
'''
# Full packet format: length of length (1 byte) | length (length of length bytes) | message (length bytes)
# First byte specifies length length
pyld_len_len_bytes = read_n_bytes(from_file, 1)
pyld_len_len = int.from_bytes(pyld_len_len_bytes, 'big')
# Next length length bytes specify length
pyld_len_bytes = read_n_bytes(from_file, pyld_len_len)
pyld_len = int.from_bytes(pyld_len_bytes, 'big')
# Next length bytes specify msg
pyld_bytes = read_n_bytes(from_file, pyld_len)
return pyld_bytes
def read_n_bytes(from_file, n):
''' Read and return exactly n bytes from a file.
The file should be opened in binary mode with read permission.
'''
# I wonder which is more efficient: continuing to append to a string and taking len, or appending to a list and summing lens
pieces = []
n_bytes_read = 0
while n_bytes_read < n:
next_bytes = from_file.read(n - n_bytes_read)
if len(next_bytes) == 0:
raise EOFError
pieces.append(next_bytes)
n_bytes_read += len(next_bytes)
return b''.join(pieces)
def receive_L3_message(from_skt):
''' Receive an L3 communication packet from socket and return its binary message.
Wrapper for read_item.
'''
try:
with from_skt.makefile('rb') as skt_file:
return read_L3_message(skt_file)
except EOFError:
# Some basic research suggests that while in general a socket may recv 0 bytes for reasons other than the cxn breaking,
# this is the only reason a *blocking, streaming* socket like this one will recv 0 bytes (o/w it will block until it recvs)
raise ConnectionError(f'Lost connection with {":".join(map(str, from_skt.getsockname()))}')
def decode_object(raw_obj):
''' Convert bytes into app-level communication message Python data structure via JSON. '''
try:
return json.loads(raw_obj.decode('utf-8'))
except (json.decoder.JSONDecodeError, UnicodeDecodeError) as e:
raise RequestFormatError('object is not valid JSON encoded in utf-8') from e
def encode_object(obj):
''' Convert app-level communication message Python data structure into bytes via JSON. '''
return json.dumps(obj).encode('utf-8') # bubbles stringifying exceptions
def write_item(to_file, raw_item):
''' Write a binary item to a file.
File should be opened in binary mode with write permission.
'''
# Write handles retries as needed (buffers full, disk busy, etc)
to_file.write(raw_item) # May raise OSError if write fails
def send_item(to_skt, raw_item):
''' Send a binary sequence to a socket (handling retries and such).
Wrapper for write_item.
'''
try:
with to_skt.makefile('wb') as skt_file:
write_item(skt_file, raw_item)
except OSError:
# Some basic research suggests that while in general a socket may send 0 bytes for reasons other than the cxn breaking,
# this is the only reason a *blocking* socket like this one will send 0 bytes (o/w it will block until it can be sent)
raise ConnectionError(f'Lost connection with {":".join(map(str, to_skt.getsockname()))}')
def build_L3_packet(raw_pyld):
''' Build L3 communication packet from binary message. '''
pyld_len = len(raw_pyld)
pyld_len_len = (pyld_len.bit_length() + 7) // 8
# Pretty unreasonable for this to happen since it would require a message of length more than 2^255 - 1, when all the data in the world can currently fit into a few zeta (2^70) bytes
if pyld_len_len > 255:
RuntimeError('Payload size exceeds the maximum the protocol can support')
# Full packet format (as described above): length of length (1 byte) | length (length of length bytes) | message (length bytes)
packet = b''.join((
pyld_len_len.to_bytes(1, 'big'),
pyld_len.to_bytes(pyld_len_len, 'big'),
raw_pyld,
))
return packet
def send_L3_message(to_skt, raw_msg):
''' Wrap a message in an L3 communication packet and write it to a socket.
Wrapper for send_item.
'''
#send_item(to_skt, build_L3_packet(raw_msg))
send_message(to_skt, raw_msg, build_L3_packet)
def err_desc(err):
''' Returns the error message, without annoying string wrapping by KeyError.
Ex: str(KeyError('string')) = "'string'".
err_desc(KeyError('string')) = 'string'.
'''
return str(err) if type(err) is not KeyError else ",".join(map(str, err.args))
''' Server
'''
class Server():
def __init__(self, program, proj_suffix=''):
# open listening socket
new_cxns = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket_to_addr = {new_cxns: None}
socket_to_msgs = {}
# socket cxt mgr forces harder to read level of indentation: TODO: cleanup socket, then, in destructor
new_cxns.bind((socket.gethostname(), 0))
new_cxns.listen()
program.port = new_cxns.getsockname()[1]
if program.verbose: print(f'Listening on port {program.port}...')
try:
name = program.cluster_name + str(program.nodeid) + proj_suffix
program.catalog.register('chord', name, program.port, 'tfisher4') # Spawns registration thread
if program.verbose: print(f'Registered as {name} to catalog service...')
except AttributeError:
pass
while True: # Poll forever
readable, _, _ = select.select(socket_to_addr, [], []) # blocks until >= 1 skt ready
for rd_skt in readable:
if socket_to_addr[rd_skt] is None:
new_skt, addr = new_cxns.accept()
socket_to_addr[new_skt] = addr
socket_to_msgs[new_skt] = nl_socket_messages(new_skt)
if program.verbose: print(f'Accepted connection with {addr[0]}:{addr[1]}...')
continue
addr = socket_to_addr[rd_skt]
try: # Assume cxn unbreakable, client waiting for rsp
try: # Assume request is valid JSON in correct format corresponding to valid operation
try: # Assume request is valid JSON encoded via utf-8
request = decode_object(next(socket_to_msgs[rd_skt]))
except RequestFormatError as e:
raise BadRequestError(e)
if program.verbose: print(f'Received request {request} from {addr[0]}:{addr[1]}...')
res = self.dispatch(program, request)
except BadRequestError as e:
res = e
rsp = self.build_response(res)
send_nl_message(rd_skt, encode_object(rsp))
if program.verbose: print(f'Sent response {rsp} to {addr[0]}:{addr[1]}...')
except (ConnectionError, StopIteration):
if program.verbose: print(f'Lost connection with {addr[0]}:{addr[1]}.')
rd_skt.close()
socket_to_addr.pop(rd_skt)
socket_to_msgs.pop(rd_skt)
# Other exceptions are unexpected: let them run their course
new_cxns.close()
def dispatch(self, program, request):
''' Process request <req_obj> by dispatching to appropriate method. '''
try:
return execute_operation(request, program)
except Exception as e:
raise BadRequestError(e)
def build_response(self, result):
''' Builds a response object from a result.
A result may either be a BadRequestError wrapping an underlying error,
or the result returned by a valid operation.
'''
if isinstance(result, BadRequestError):
cause = result.cause
return {
'status': f'{type(cause).__module__}.{type(cause).__name__}', # Identify error to be raised client-side
'description': err_desc(cause) # Pass also the error description to report client-side
}
return {
'status': 'success',
'result': result
}
class BadRequestError(RuntimeError):
''' An internal exception used to distinguish exceptions expected due to bad re
quests. '''
def __init__(self, cause):
super().__init__()
self.cause = cause
def __str__(self):
return str(self.cause)