Skip to content

Commit ce35668

Browse files
committed
Add couple of extra unique token characters to requests
This is to ensure that the NUID does not become shared state for some reason and end up accidentally using the same response token for a request. Signed-off-by: Waldemar Quevedo <[email protected]>
1 parent 84ac62f commit ce35668

File tree

4 files changed

+30
-17
lines changed

4 files changed

+30
-17
lines changed

nats/aio/client.py

+7-8
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import ssl
2121
import sys
2222
import time
23+
from secrets import token_hex
2324
from dataclasses import dataclass
2425
from email.parser import BytesParser
2526
from random import shuffle
@@ -56,7 +57,7 @@
5657
Subscription,
5758
)
5859

59-
__version__ = '2.1.6'
60+
__version__ = '2.1.7'
6061
__lang__ = 'python3'
6162
_logger = logging.getLogger(__name__)
6263
PROTOCOL = 1
@@ -168,12 +169,8 @@ def __init__(self) -> None:
168169
self._reconnection_task: Union[asyncio.Task[None], None] = None
169170
self._reconnection_task_future: Optional[asyncio.Future] = None
170171
self._max_payload: int = DEFAULT_MAX_PAYLOAD_SIZE
171-
# This is the client id that the NATS server knows
172-
# about. Useful in debugging application errors
173-
# when logged with this identifier along
174-
# with nats server log.
175-
# This would make more sense if we log the server
176-
# connected to as well in case of cluster setup.
172+
173+
# client id that the NATS server knows about.
177174
self._client_id: Optional[str] = None
178175
self._sid: int = 0
179176
self._subs: Dict[int, Subscription] = {}
@@ -929,8 +926,10 @@ async def _request_new_style(
929926
await self._init_request_sub()
930927
assert self._resp_sub_prefix
931928

932-
# Use a new NUID for the token inbox and then use the future.
929+
# Use a new NUID + couple of unique token bytes to identify the request,
930+
# then use the future to get the response.
933931
token = self._nuid.next()
932+
token.extend(token_hex(2).encode())
934933
inbox = self._resp_sub_prefix[:]
935934
inbox.extend(token)
936935
future: asyncio.Future = asyncio.Future()

nats/nuid.py

+4-6
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@
1212
# limitations under the License.
1313
#
1414

15-
from random import Random, SystemRandom
15+
from random import Random
1616
from sys import maxsize as MaxInt
17+
from secrets import token_bytes, randbelow
1718

1819
DIGITS = b'0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
1920
BASE = 62
@@ -33,8 +34,7 @@ class NUID:
3334
"""
3435

3536
def __init__(self) -> None:
36-
self._srand = SystemRandom()
37-
self._prand = Random(self._srand.randint(0, MaxInt))
37+
self._prand = Random(randbelow(MaxInt))
3838
self._seq = self._prand.randint(0, MAX_SEQ)
3939
self._inc = MIN_INC + self._prand.randint(BASE + 1, INC)
4040
self._prefix = bytearray()
@@ -60,9 +60,7 @@ def next(self) -> bytearray:
6060
return prefix
6161

6262
def randomize_prefix(self) -> None:
63-
random_bytes = (
64-
self._srand.getrandbits(8) for i in range(PREFIX_LENGTH)
65-
)
63+
random_bytes = token_bytes(PREFIX_LENGTH)
6664
self._prefix = bytearray(DIGITS[c % BASE] for c in random_bytes)
6765

6866
def reset_sequential(self) -> None:

tests/test_js.py

+16
Original file line numberDiff line numberDiff line change
@@ -700,6 +700,22 @@ async def error_cb(err):
700700
assert len(msgs) <= 100
701701
assert sub.pending_msgs == 0
702702
assert sub.pending_bytes == 0
703+
704+
# Consumer has a single message pending but none in buffer.
705+
await js.publish("a3", b'last message')
706+
info = await sub.consumer_info()
707+
assert info.num_pending == 1
708+
assert sub.pending_msgs == 0
709+
710+
# Remove interest
711+
await sub.unsubscribe()
712+
with pytest.raises(TimeoutError):
713+
await sub.fetch(1, timeout=1)
714+
715+
# The pending message is still there, but not possible to consume.
716+
info = await sub.consumer_info()
717+
assert info.num_pending == 1
718+
703719
await nc.close()
704720

705721

todo.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
- [X] io_loop becomes loop parameter
2121
- [X] Drain Mode
2222
- [X] Connect timeout
23+
- [X] Adopt async/await in client
24+
- [X] Subscription object on subscribe
25+
- [X] Error handler yields the subscription
2326
- [ ] Use asyncio.Protocol
24-
- [ ] Adopt async/await in client
25-
- [ ] Subscription object on subscribe
26-
- [ ] Error handler yields the subscription

0 commit comments

Comments
 (0)