Skip to content

Commit 2cb88f3

Browse files
committed
Finish type hinting
1 parent 23230ef commit 2cb88f3

File tree

3 files changed

+12
-8
lines changed

3 files changed

+12
-8
lines changed

dispatcher/brokers/pg_notify.py

+3
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ def get_publish_channel(self, channel: Optional[str] = None) -> str:
100100

101101
raise ValueError('Could not determine a channel to use publish to from settings or PGNotify config')
102102

103+
def __str__(self) -> str:
104+
return 'pg_notify-broker'
105+
103106
# --- asyncio connection methods ---
104107

105108
async def aget_connection(self) -> psycopg.AsyncConnection:

dispatcher/brokers/socket.py

+8-7
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@ def __init__(self, client_id: int, reader: asyncio.StreamReader, writer: asyncio
2121
self.yield_clear = asyncio.Event()
2222
self.replies_to_send: list = []
2323

24-
def write(self, message) -> None:
24+
def write(self, message: str, /) -> None:
2525
self.writer.write((message + '\n').encode())
2626

27-
def queue_reply(self, reply: str) -> None:
27+
def queue_reply(self, reply: str, /) -> None:
2828
self.replies_to_send.append(reply)
2929

30-
async def send_replies(self):
30+
async def send_replies(self) -> None:
3131
for reply in self.replies_to_send.copy():
3232
logger.info(f'Sending reply to client_id={self.client_id} len={len(reply)}')
3333
self.write(reply)
@@ -54,8 +54,8 @@ def __init__(self, socket_path: str) -> None:
5454
self.sock: Optional[socket.socket] = None # for synchronous clients
5555
self.incoming_queue: asyncio.Queue = asyncio.Queue()
5656

57-
def __str__(self):
58-
return f'socket-producer-{self.socket_path}'
57+
def __str__(self) -> str:
58+
return f'socket-broker-{self.socket_path}'
5959

6060
async def _add_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
6161
client = Client(self.client_ct, reader, writer)
@@ -190,10 +190,11 @@ def process_notify(
190190
finally:
191191
self.sock = None
192192

193-
def _publish_from_sock(self, sock, message) -> None:
193+
def _publish_from_sock(self, sock: socket.socket, message: str) -> None:
194194
sock.sendall((message + "\n").encode())
195195

196-
def publish_message(self, channel=None, message=None) -> None:
196+
def publish_message(self, channel: Optional[str] = None, message: Optional[str] = None) -> None:
197+
assert isinstance(message, str)
197198
if self.sock:
198199
logger.info(f'Publishing socket message len={len(message)} via existing connection')
199200
self._publish_from_sock(self.sock, message)

dispatcher/producers/brokered.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def __init__(self, broker: Broker) -> None:
1515
self.dispatcher: Optional[DispatcherMain] = None
1616
super().__init__()
1717

18-
def __str__(self):
18+
def __str__(self) -> str:
1919
return f'brokered-producer-{self.broker}'
2020

2121
async def start_producing(self, dispatcher: DispatcherMain) -> None:

0 commit comments

Comments
 (0)