@@ -31,6 +31,8 @@ def __init__(self, handler_callback=None):
3131 self ._seen_block_hashes = set ()
3232 self ._to_trio = queue .Queue ()
3333 self ._to_asyncio = queue .Queue ()
34+ self ._peer_count = 0
35+ self ._peer_count_lock = threading .Lock ()
3436
3537 def register_handler (self , handler_callback ):
3638 self ._handler_callback = handler_callback
@@ -96,7 +98,8 @@ async def disconnect_peer(self, peer_addr):
9698
9799 @property
98100 def peer_count (self ) -> int :
99- return 0
101+ with self ._peer_count_lock :
102+ return self ._peer_count
100103
101104 async def _asyncio_reader (self ):
102105 while True :
@@ -132,6 +135,8 @@ async def _trio_main(self):
132135
133136 async def stream_handler (stream ):
134137 streams .append (stream )
138+ with self ._peer_count_lock :
139+ self ._peer_count += 1
135140 peer_id = stream .muxed_conn .peer_id
136141 addr = f"peer:{ peer_id } "
137142 self ._to_asyncio .put (("PEER_CONNECTED" , None ))
@@ -147,7 +152,10 @@ async def stream_handler(stream):
147152 self ._to_asyncio .put (("MSG" , msg ))
148153 except Exception : pass
149154 except Exception : pass
150- if stream in streams : streams .remove (stream )
155+ if stream in streams :
156+ streams .remove (stream )
157+ with self ._peer_count_lock :
158+ self ._peer_count -= 1
151159
152160 host .set_stream_handler (PROTOCOL_ID , stream_handler )
153161
@@ -185,7 +193,10 @@ async def check_queue():
185193 if addr == arg :
186194 try : await s .reset ()
187195 except Exception : pass
188- if s in streams : streams .remove (s )
196+ if s in streams :
197+ streams .remove (s )
198+ with self ._peer_count_lock :
199+ self ._peer_count -= 1
189200 except Exception : pass
190201 await trio .sleep (0.1 )
191202
0 commit comments