diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index 76224e8e..01a12c54 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -17,7 +17,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v4 with: - python-version: "3.8" + python-version: "3.12" - name: Install dependencies run: | diff --git a/.travis.yml b/.travis.yml index 2c7578c1..7e8905f8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,7 +16,7 @@ before_script: - export PATH=$HOME/nats-server:$PATH install: - - pip install -e .[nkeys,aiohttp,fast-mail-parser] + - pip install -e .[nkeys,aiohttp,fast-mail-parser,orjson,uuid_utils] script: - make ci @@ -39,6 +39,16 @@ jobs: - bash ./scripts/install_nats.sh install: - pip install -e .[fast-mail-parser] + - name: "Python: 3.13/orjson + uuid_utils" + python: "3.13" + before_install: + - sudo apt update && sudo apt install gcc build-essential -y + - sudo apt-get install python3-pip + - sudo apt-get install python3-pytest + - pip install --upgrade pip + - bash ./scripts/install_nats.sh + install: + - pip install -e .[fast-mail-parser,orjson,uuid_utils] - name: "Python: 3.12" python: "3.12" before_install: @@ -49,6 +59,16 @@ jobs: - bash ./scripts/install_nats.sh install: - pip install -e .[fast-mail-parser] + - name: "Python: 3.12/orjson + uuid_utils" + python: "3.12" + before_install: + - sudo apt update && sudo apt install gcc build-essential -y + - sudo apt-get install python3-pip + - sudo apt-get install python3-pytest + - pip install --upgrade pip + - bash ./scripts/install_nats.sh + install: + - pip install -e .[fast-mail-parser,orjson,uuid_utils] - name: "Python: 3.11" python: "3.11" before_install: @@ -59,6 +79,16 @@ jobs: - bash ./scripts/install_nats.sh install: - pip install -e .[fast-mail-parser] + - name: "Python: 3.11/orjson + uuid_utils" + python: "3.11" + before_install: + - sudo apt update && sudo apt install gcc build-essential -y + - sudo apt-get install python3-pip + - sudo apt-get install python3-pytest + - pip install --upgrade pip + - bash ./scripts/install_nats.sh + install: + - pip install -e .[fast-mail-parser,orjson,uuid_utils] - name: "Python: 3.11/uvloop" python: "3.11" before_install: @@ -85,7 +115,10 @@ jobs: allow_failures: - name: "Python: 3.8" - name: "Python: 3.11" + - name: "Python: 3.11/orjson + uuid_utils" - name: "Python: 3.11/uvloop" - name: "Python: 3.11 (nats-server@main)" - name: "Python: 3.12" + - name: "Python: 3.12/orjson + uuid_utils" - name: "Python: 3.13" + - name: "Python: 3.13/orjson + uuid_utils" diff --git a/nats/aio/client.py b/nats/aio/client.py index 25253bc6..a6f09b4a 100644 --- a/nats/aio/client.py +++ b/nats/aio/client.py @@ -17,7 +17,6 @@ import asyncio import base64 import ipaddress -import json import logging import re import ssl @@ -33,6 +32,8 @@ from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple, Union from urllib.parse import ParseResult, urlparse +from nats.json_util import JsonUtil as json + try: from fast_mail_parser import parse_email except ImportError: @@ -1621,8 +1622,8 @@ def _connect_command(self) -> bytes: if self.options["no_echo"] is not None: options["echo"] = not self.options["no_echo"] - connect_opts = json.dumps(options, sort_keys=True) - return b"".join([CONNECT_OP + _SPC_ + connect_opts.encode() + _CRLF_]) + connect_opts = json.dump_bytes(options, sort_keys=True) + return b"".join([CONNECT_OP + _SPC_ + connect_opts + _CRLF_]) async def _process_ping(self) -> None: """ diff --git a/nats/aio/msg.py b/nats/aio/msg.py index 5724706a..835b6281 100644 --- a/nats/aio/msg.py +++ b/nats/aio/msg.py @@ -14,11 +14,11 @@ from __future__ import annotations import datetime -import json from dataclasses import dataclass from typing import TYPE_CHECKING, Dict, List, Optional, Union from nats.errors import Error, MsgAlreadyAckdError, NotJSMessageError +from nats.json_util import JsonUtil as json if TYPE_CHECKING: from nats import NATS @@ -132,7 +132,7 @@ async def nak(self, delay: Union[int, float, None] = None) -> None: if delay: json_args["delay"] = int(delay * 10**9) # from seconds to ns if json_args: - payload += b" " + json.dumps(json_args).encode() + payload += b" " + json.dump_bytes(json_args) await self._client.publish(self.reply, payload) self._ackd = True diff --git a/nats/aio/subscription.py b/nats/aio/subscription.py index 31fbb887..f502c9a4 100644 --- a/nats/aio/subscription.py +++ b/nats/aio/subscription.py @@ -23,12 +23,17 @@ List, Optional, ) -from uuid import uuid4 from nats import errors # Default Pending Limits of Subscriptions from nats.aio.msg import Msg +# Use uuid_utils if available, otherwise use the standard library +try: + from uuid_utils import uuid4 +except ImportError: + from uuid import uuid4 + if TYPE_CHECKING: from nats.js import JetStreamContext diff --git a/nats/js/client.py b/nats/js/client.py index d26413c0..68aea6f1 100644 --- a/nats/js/client.py +++ b/nats/js/client.py @@ -15,7 +15,6 @@ from __future__ import annotations import asyncio -import json import time from email.parser import BytesParser from secrets import token_hex @@ -50,6 +49,7 @@ VALID_BUCKET_RE, ObjectStore, ) +from nats.json_util import JsonUtil as json if TYPE_CHECKING: from nats import NATS @@ -1135,7 +1135,7 @@ async def _fetch_one( await self._nc.publish( self._nms, - json.dumps(next_req).encode(), + json.dump_bytes(next_req), self._deliver, ) @@ -1220,7 +1220,7 @@ async def _fetch_n( next_req["no_wait"] = True await self._nc.publish( self._nms, - json.dumps(next_req).encode(), + json.dump_bytes(next_req), self._deliver, ) await asyncio.sleep(0) @@ -1286,7 +1286,7 @@ async def _fetch_n( await self._nc.publish( self._nms, - json.dumps(next_req).encode(), + json.dump_bytes(next_req), self._deliver, ) await asyncio.sleep(0) diff --git a/nats/js/manager.py b/nats/js/manager.py index bfd5937f..f385f143 100644 --- a/nats/js/manager.py +++ b/nats/js/manager.py @@ -15,13 +15,13 @@ from __future__ import annotations import base64 -import json from email.parser import BytesParser from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional from nats.errors import NoRespondersError from nats.js import api from nats.js.errors import APIError, NotFoundError, ServiceUnavailableError +from nats.json_util import JsonUtil as json if TYPE_CHECKING: from nats import NATS @@ -60,9 +60,9 @@ async def find_stream_name_by_subject(self, subject: str) -> str: """ req_sub = f"{self._prefix}.STREAM.NAMES" - req_data = json.dumps({"subject": subject}) + req_data = json.dump_bytes({"subject": subject}) info = await self._api_request( - req_sub, req_data.encode(), timeout=self._timeout + req_sub, req_data, timeout=self._timeout ) if not info["streams"]: raise NotFoundError @@ -76,12 +76,12 @@ async def stream_info( """ Get the latest StreamInfo by stream name. """ - req_data = "" + req_data = b"" if subjects_filter: - req_data = json.dumps({"subjects_filter": subjects_filter}) + req_data = json.dump_bytes({"subjects_filter": subjects_filter}) resp = await self._api_request( f"{self._prefix}.STREAM.INFO.{name}", - req_data.encode(), + req_data, timeout=self._timeout, ) return api.StreamInfo.from_response(resp) @@ -114,10 +114,10 @@ async def add_stream( "path separators (forward or backward slash), or non-printable characters." ) - data = json.dumps(config.as_dict()) + data = json.dump_bytes(config.as_dict()) resp = await self._api_request( f"{self._prefix}.STREAM.CREATE.{stream_name}", - data.encode(), + data, timeout=self._timeout, ) return api.StreamInfo.from_response(resp) @@ -136,10 +136,10 @@ async def update_stream( if config.name is None: raise ValueError("nats: stream name is required") - data = json.dumps(config.as_dict()) + data = json.dump_bytes(config.as_dict()) resp = await self._api_request( f"{self._prefix}.STREAM.UPDATE.{config.name}", - data.encode(), + data, timeout=self._timeout, ) return api.StreamInfo.from_response(resp) @@ -171,11 +171,9 @@ async def purge_stream( if keep: stream_req["keep"] = keep - req = json.dumps(stream_req) + req = json.dump_bytes(stream_req) resp = await self._api_request( - f"{self._prefix}.STREAM.PURGE.{name}", - req.encode(), - timeout=self._timeout + f"{self._prefix}.STREAM.PURGE.{name}", req, timeout=self._timeout ) return resp["success"] @@ -198,9 +196,7 @@ async def streams_info(self, offset=0) -> List[api.StreamInfo]: """ resp = await self._api_request( f"{self._prefix}.STREAM.LIST", - json.dumps({ - "offset": offset - }).encode(), + json.dump_bytes({"offset": offset}), timeout=self._timeout, ) streams = [] @@ -216,9 +212,7 @@ async def streams_info_iterator(self, """ resp = await self._api_request( f"{self._prefix}.STREAM.LIST", - json.dumps({ - "offset": offset - }).encode(), + json.dump_bytes({"offset": offset}), timeout=self._timeout, ) @@ -240,7 +234,7 @@ async def add_consumer( config = config.evolve(**params) durable_name = config.durable_name req = {"stream_name": stream, "config": config.as_dict()} - req_data = json.dumps(req).encode() + req_data = json.dump_bytes(req) resp = None subject = "" @@ -283,9 +277,7 @@ async def consumers_info( """ resp = await self._api_request( f"{self._prefix}.CONSUMER.LIST.{stream}", - b"" if offset is None else json.dumps({ - "offset": offset - }).encode(), + b"" if offset is None else json.dump_bytes({"offset": offset}), timeout=self._timeout, ) consumers = [] @@ -318,19 +310,19 @@ async def get_msg( req["last_by_subj"] = None req.pop("last_by_subj", None) req["next_by_subj"] = subject - data = json.dumps(req) + data = json.dump_bytes(req) if direct: # $JS.API.DIRECT.GET.KV_{stream_name}.$KV.TEST.{key} if subject and (seq is None): # last_by_subject type request requires no payload. - data = "" + data = b"" req_subject = f"{self._prefix}.DIRECT.GET.{stream_name}.{subject}" else: req_subject = f"{self._prefix}.DIRECT.GET.{stream_name}" resp = await self._nc.request( - req_subject, data.encode(), timeout=self._timeout + req_subject, data, timeout=self._timeout ) raw_msg = JetStreamManager._lift_msg_to_raw_msg(resp) return raw_msg @@ -338,7 +330,7 @@ async def get_msg( # Non Direct form req_subject = f"{self._prefix}.STREAM.MSG.GET.{stream_name}" resp_data = await self._api_request( - req_subject, data.encode(), timeout=self._timeout + req_subject, data, timeout=self._timeout ) raw_msg = api.RawStreamMsg.from_response(resp_data["message"]) @@ -389,8 +381,8 @@ async def delete_msg(self, stream_name: str, seq: int) -> bool: """ req_subject = f"{self._prefix}.STREAM.MSG.DELETE.{stream_name}" req = {"seq": seq} - data = json.dumps(req) - resp = await self._api_request(req_subject, data.encode()) + data = json.dump_bytes(req) + resp = await self._api_request(req_subject, data) return resp["success"] async def get_last_msg( diff --git a/nats/js/object_store.py b/nats/js/object_store.py index 70ce3d3b..8610e607 100644 --- a/nats/js/object_store.py +++ b/nats/js/object_store.py @@ -15,7 +15,6 @@ import asyncio import base64 import io -import json import re from dataclasses import dataclass from datetime import datetime, timezone @@ -34,6 +33,7 @@ ObjectNotFoundError, ) from nats.js.kv import MSG_ROLLUP_SUBJECT +from nats.json_util import JsonUtil as json VALID_BUCKET_RE = re.compile(r"^[a-zA-Z0-9_-]+$") VALID_KEY_RE = re.compile(r"^[-/_=\.a-zA-Z0-9]+$") @@ -343,7 +343,7 @@ async def put( try: await self._js.publish( meta_subj, - json.dumps(info.as_dict()).encode(), + json.dump_bytes(info.as_dict()), headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT}, ) except Exception as err: @@ -412,7 +412,7 @@ async def update_meta( try: await self._js.publish( meta_subj, - json.dumps(info.as_dict()).encode(), + json.dump_bytes(info.as_dict()), headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT}, ) except Exception as err: @@ -538,7 +538,7 @@ async def delete(self, name: str) -> ObjectResult: try: await self._js.publish( meta_subj, - json.dumps(info.as_dict()).encode(), + json.dump_bytes(info.as_dict()), headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT}, ) finally: diff --git a/nats/json_util.py b/nats/json_util.py new file mode 100644 index 00000000..516ef36b --- /dev/null +++ b/nats/json_util.py @@ -0,0 +1,91 @@ +"""A module providing a utility class for handling JSON-related operations.""" + +import json +from typing import Any + +try: + import orjson +except ImportError: + orjson = None + + +class JsonUtil: + """A utility class for handling JSON serialization operations. + This class provides static methods for converting between Python objects and JSON + strings/bytes. It uses the `orjson` library when available for its performance + advantages, falling back to the standard `json` library when `orjson` is not + installed. + + The class handles compatibility between the two libraries, particularly for options + like 'sort_keys' which have different implementations in each library. All methods + maintain consistent behavior regardless of which JSON library is being used. + + Methods: + dumps(obj, *args, **kwargs) -> str: Converts object to JSON string + dump_bytes(obj, *args, **kwargs) -> bytes: Converts object to JSON bytes + loads(s, *args, **kwargs) -> Any: Parses JSON string into Python object + """ + + @staticmethod + def _handle_sort_keys(kwargs): + """Internal helper to handle sort_keys parameter for orjson compatibility. + Args: + kwargs: The keyword arguments dictionary to modify + Returns: + Modified kwargs dictionary with orjson-compatible options + """ + if kwargs.pop("sort_keys", False): + option = kwargs.get("option", 0) | orjson.OPT_SORT_KEYS + kwargs["option"] = option + return kwargs + + @staticmethod + def dumps(obj, *args, **kwargs) -> str: + """Convert a Python object into a JSON string. + Args: + obj: The data to be converted + *args: Extra arguments to pass to the dumps() function + **kwargs: Extra keyword arguments to pass to the dumps() function. + Special handling for 'sort_keys' which is translated to + orjson.OPT_SORT_KEYS when using orjson. + Returns: + str: A JSON string representation of obj + """ + if orjson is None: + return json.dumps(obj, *args, **kwargs) + else: + kwargs = JsonUtil._handle_sort_keys(kwargs) + return orjson.dumps(obj, *args, **kwargs).decode("utf-8") + + @staticmethod + def dump_bytes(obj, *args, **kwargs) -> bytes: + """Convert a Python object into a JSON bytes string. + Args: + obj: The data to be converted + *args: Extra arguments to pass to the dumps() function + **kwargs: Extra keyword arguments to pass to the dumps() function. + Special handling for 'sort_keys' which is translated to + orjson.OPT_SORT_KEYS when using orjson. + Returns: + bytes: A JSON bytes string representation of obj + """ + if orjson is None: + return json.dumps(obj, *args, **kwargs).encode("utf-8") + else: + kwargs = JsonUtil._handle_sort_keys(kwargs) + return orjson.dumps(obj, *args, **kwargs) + + @staticmethod + def loads(s: str, *args, **kwargs) -> Any: + """Parse a JSON string into a Python object. + Args: + s: The JSON string to be parsed + *args: Extra arguments to pass to the orjson.loads() function + **kwargs: Extra keyword arguments to pass to the orjson.loads() function + Returns: + The Python representation of s + """ + if orjson is None: + return json.loads(s, *args, **kwargs) + else: + return orjson.loads(s, *args, **kwargs) diff --git a/nats/micro/service.py b/nats/micro/service.py index 159530a3..37343537 100644 --- a/nats/micro/service.py +++ b/nats/micro/service.py @@ -1,6 +1,5 @@ from __future__ import annotations -import json import re import time from asyncio import Event @@ -21,6 +20,7 @@ from nats.aio.client import Client from nats.aio.msg import Msg from nats.aio.subscription import Subscription +from nats.json_util import JsonUtil as json from .request import Handler, Request, ServiceError @@ -859,18 +859,18 @@ async def _handle_ping_request(self, msg: Msg) -> None: metadata=self._metadata, ).to_dict() - await msg.respond(data=json.dumps(ping).encode()) + await msg.respond(data=json.dump_bytes(ping)) async def _handle_info_request(self, msg: Msg) -> None: """Handle an info message.""" info = self.info().to_dict() - await msg.respond(data=json.dumps(info).encode()) + await msg.respond(data=json.dump_bytes(info)) async def _handle_stats_request(self, msg: Msg) -> None: """Handle a stats message.""" stats = self.stats().to_dict() - await msg.respond(data=json.dumps(stats).encode()) + await msg.respond(data=json.dump_bytes(stats)) def control_subject( diff --git a/nats/protocol/parser.py b/nats/protocol/parser.py index 6b8c7255..8f45650b 100644 --- a/nats/protocol/parser.py +++ b/nats/protocol/parser.py @@ -17,11 +17,11 @@ from __future__ import annotations -import json import re from typing import Any, Dict from nats.errors import ProtocolError +from nats.json_util import JsonUtil as json MSG_RE = re.compile( b"\\AMSG\\s+([^\\s]+)\\s+([^\\s]+)\\s+(([^\\s]+)[^\\S\r\n]+)?(\\d+)\r\n" diff --git a/pyproject.toml b/pyproject.toml index a3955ca1..db8d0633 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,8 @@ classifiers = [ nkeys = ['nkeys'] aiohttp = ['aiohttp'] fast_parse = ['fast-mail-parser'] +orjson = ['orjson'] +uuid_utils = ['uuid_utils'] [tool.setuptools] zip-safe = true diff --git a/setup.py b/setup.py index 8b60030c..8fb8759d 100644 --- a/setup.py +++ b/setup.py @@ -10,6 +10,8 @@ "nkeys": ["nkeys"], "aiohttp": ["aiohttp"], "fast_parse": ["fast-mail-parser"], + "orjson": ["orjson"], + "uuid_utils": ["uuid_utils"], }, packages=["nats", "nats.aio", "nats.micro", "nats.protocol", "nats.js"], package_data={"nats": ["py.typed"]}, diff --git a/tests/test_client.py b/tests/test_client.py index e16ee09a..39a0f4ca 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,6 +1,5 @@ import asyncio import http.client -import json import os import ssl import time @@ -12,6 +11,7 @@ import nats.errors import pytest from nats.aio.client import Client as NATS, ServerVersion, __version__ +from nats.json_util import JsonUtil as json from tests.utils import ( ClusteringDiscoveryAuthTestCase, ClusteringTestCase, @@ -37,6 +37,14 @@ def test_default_connect_command(self): nc.options["no_echo"] = False got = nc._connect_command() expected = f'CONNECT {{"echo": true, "lang": "python3", "pedantic": false, "protocol": 1, "verbose": false, "version": "{__version__}"}}\r\n' + + try: + import orjson + + # If using orjson, expected string is without spaces (except for first space after CONNECT) + expected = expected.replace(" ", "").replace("CONNECT", "CONNECT ") + except ImportError: + pass self.assertEqual(expected.encode(), got) def test_default_connect_command_with_name(self): @@ -48,6 +56,15 @@ def test_default_connect_command_with_name(self): nc.options["no_echo"] = False got = nc._connect_command() expected = f'CONNECT {{"echo": true, "lang": "python3", "name": "secret", "pedantic": false, "protocol": 1, "verbose": false, "version": "{__version__}"}}\r\n' + + try: + import orjson + + # If using orjson, expected string is without spaces (except for first space after CONNECT) + expected = expected.replace(" ", "").replace("CONNECT", "CONNECT ") + except ImportError: + pass + self.assertEqual(expected.encode(), got) def test_semver_parsing(self): diff --git a/tests/test_compatibility.py b/tests/test_compatibility.py index 838c3a8a..98bca630 100644 --- a/tests/test_compatibility.py +++ b/tests/test_compatibility.py @@ -1,7 +1,6 @@ from __future__ import annotations import asyncio -import json import os from dataclasses import dataclass, field from typing import Any, Dict, List, Optional @@ -9,6 +8,7 @@ import nats from nats.aio.subscription import Subscription +from nats.json_util import JsonUtil as json from nats.micro.request import ServiceError from nats.micro.service import ( EndpointConfig, diff --git a/tests/test_js.py b/tests/test_js.py index 08a6377b..935c83f6 100644 --- a/tests/test_js.py +++ b/tests/test_js.py @@ -2,14 +2,12 @@ import base64 import datetime import io -import json import random import re import string import tempfile import time import unittest -import uuid from hashlib import sha256 import nats @@ -20,6 +18,7 @@ from nats.aio.msg import Msg from nats.errors import * from nats.js.errors import * +from nats.json_util import JsonUtil as json from tests.utils import * try: @@ -27,6 +26,11 @@ except ImportError: parse_email = None +try: + import uuid_utils as uuid +except ImportError: + import uuid + class PublishTest(SingleJetStreamServerTestCase): diff --git a/tests/test_micro_service.py b/tests/test_micro_service.py index 9fa47fc7..4b936c5c 100644 --- a/tests/test_micro_service.py +++ b/tests/test_micro_service.py @@ -77,13 +77,7 @@ async def add_handler(request: Request): svcs.append(svc) for _ in range(50): - await nc.request( - "svc.add", - json.dumps({ - "x": 22, - "y": 11 - }).encode("utf-8") - ) + await nc.request("svc.add", json.dump_bytes({"x": 22, "y": 11})) for svc in svcs: info = svc.info()