-
-
Notifications
You must be signed in to change notification settings - Fork 30
/
Copy pathdummysocket.py
84 lines (65 loc) · 1.76 KB
/
dummysocket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
"""
dummsocket.py
Dummy socket class for testing SocketWrapper methods.
Created on 21 Aug 2024
:author: semuadmin
:copyright: SEMU Consulting © 2020
:license: BSD 3-Clause
"""
class DummySocket:
"""
Dummy socket class for testing SocketWrapper.
"""
def __init__(
self,
filename: str,
bufsize: int = 4096,
timeout: bool = False,
):
"""
Constructor.
Reads binary data from a file into a working buffer,
representing the socket stream.
:param str filename: filename
:param int bufsize: size of buffer
:param bool timeout: simulate TimeoutError?
"""
self._buffer = b""
self._timeout = timeout
with open(filename, "rb") as infile:
while len(self._buffer) < bufsize:
b = infile.read(16)
if b == b"":
break
self._buffer += b
def recv(self, n: int) -> bytes:
"""
Receive n bytes from dummy socket.
:param int n: number of bytes to read
:returns: bytes read
:rtype: bytes
"""
if self._timeout:
raise TimeoutError("simulated TimeoutError")
b = self._buffer[:n]
self._buffer = self._buffer[n:]
return b
def send(self, data: bytes) -> int:
"""
Send data to socket.
:param bytes data: data to send
:returns: number of bytes sent
:rtype: int
"""
print(f"data sent: {data}")
return len(data)
def sendall(self, data):
"""
Sendall data to socket.
"""
print(f"data sent: {data}")
def close(self):
"""
Close socket
"""
print("socket closed")