Skip to content

Commit f51d7e2

Browse files
authored
Migrate from requests to httpx (#1020)
1 parent 3395942 commit f51d7e2

File tree

13 files changed

+111
-80
lines changed

13 files changed

+111
-80
lines changed

LICENSES/LICENSE.httpx

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
Copyright © 2019, [Encode OSS Ltd](https://www.encode.io/).
2+
All rights reserved.
3+
4+
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
5+
6+
* Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
7+
8+
* Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
9+
10+
* Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
11+
12+
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

conda/meta.yaml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ requirements:
1515
- python >=3.9,<3.13
1616
run:
1717
- python >=3.9,<3.13
18-
- requests >=2.32
1918
- typing_extensions >=4.8
2019
- orjson >=3.9,<4
2120
- pydantic >=2.7,<2.12
@@ -26,7 +25,6 @@ requirements:
2625
- google-cloud-bigquery >=3.26.0,<3.27
2726
- google-cloud-pubsub >=2.23.1,<3
2827
- psycopg2-binary >=2.9.9,<3
29-
- types-psycopg2 >=2.9,<3
3028
- boto3 >=1.35.65,<2.0
3129
- boto3-stubs >=1.35.65,<2.0
3230
- azure-storage-blob >=12.24.0,<12.25
@@ -36,6 +34,7 @@ requirements:
3634
- elasticsearch >=8.17,<9
3735
- rich >=13,<15
3836
- python-dateutil >=2.8.2,<3
37+
- httpx >= 0.28.1
3938

4039
test:
4140
imports:

conda/post-link.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@ $PREFIX/bin/pip install \
99
'confluent-kafka[avro,json,protobuf,schemaregistry]>=2.8.2,<2.12' \
1010
'influxdb>=5.3,<6' \
1111
'jsonpath_ng>=1.7.0,<2' \
12-
'types-psycopg2>=2.9,<3'
12+
'httpx>=0.28.1'

pyproject.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ all = [
3535
"google-cloud-bigquery>=3.26.0,<3.27",
3636
"google-cloud-pubsub>=2.23.1,<3",
3737
"psycopg2-binary>=2.9.9,<3",
38-
"types-psycopg2>=2.9,<3",
3938
"boto3>=1.35.65,<2.0",
4039
"boto3-stubs>=1.35.65,<2.0",
4140
"redis[hiredis]>=5.2.0,<6",

quixstreams/platforms/quix/api.py

Lines changed: 48 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1+
import json
12
from io import BytesIO
23
from typing import List, Literal, Optional
3-
from urllib.parse import urljoin
44
from zipfile import ZipFile
55

6-
import requests
6+
import httpx
77

88
from .env import QUIX_ENVIRONMENT
99
from .exceptions import (
@@ -33,23 +33,36 @@ def __init__(
3333
self,
3434
auth_token: str,
3535
portal_api: str,
36-
api_version: Optional[str] = None,
36+
api_version: str = "2.0",
3737
default_workspace_id: Optional[str] = None,
3838
):
39-
self._portal_api_url = portal_api
4039
if not auth_token:
4140
raise MissingConnectionRequirements(
4241
f"A Quix Cloud auth token (SDK or PAT) is required; "
4342
f"set with environment variable {QUIX_ENVIRONMENT.SDK_TOKEN}"
4443
)
4544

45+
self._api_version = api_version
46+
self._auth_token = auth_token
47+
self._portal_api_url = portal_api
4648
self._default_workspace_id = (
4749
default_workspace_id or QUIX_ENVIRONMENT.workspace_id
4850
)
4951
self._request_timeout = 30
50-
self.session = self._init_session(
51-
api_version=api_version or "2.0", auth_token=auth_token
52-
)
52+
self._client: Optional[httpx.Client] = None
53+
54+
@property
55+
def client(self) -> httpx.Client:
56+
if not self._client:
57+
self._client = httpx.Client(
58+
base_url=self._portal_api_url,
59+
event_hooks={"response": [self._response_handler]},
60+
headers={
61+
"X-Version": self._api_version,
62+
"Authorization": f"Bearer {self._auth_token}",
63+
},
64+
)
65+
return self._client
5366

5467
@property
5568
def default_workspace_id(self) -> str:
@@ -68,8 +81,8 @@ def get_librdkafka_connection_config(
6881
self, workspace_id: Optional[str] = None, timeout: float = 30
6982
) -> dict:
7083
workspace_id = workspace_id or self.default_workspace_id
71-
return self.session.get(
72-
self._build_url(f"/workspaces/{workspace_id}/broker/librdkafka"),
84+
return self.client.get(
85+
f"/workspaces/{workspace_id}/broker/librdkafka",
7386
timeout=timeout,
7487
).json()
7588

@@ -86,8 +99,8 @@ def get_workspace_certificate(
8699
:return: certificate as bytes if present, or None
87100
"""
88101
workspace_id = workspace_id or self.default_workspace_id
89-
content = self.session.get(
90-
self._build_url(f"/workspaces/{workspace_id}/certificates"), timeout=timeout
102+
content = self.client.get(
103+
f"/workspaces/{workspace_id}/certificates", timeout=timeout
91104
).content
92105
if not content:
93106
return None
@@ -97,28 +110,24 @@ def get_workspace_certificate(
97110
return f.read()
98111

99112
def get_auth_token_details(self, timeout: float = 30) -> dict:
100-
return self.session.get(
101-
self._build_url("/auth/token/details"), timeout=timeout
102-
).json()
113+
return self.client.get("/auth/token/details", timeout=timeout).json()
103114

104115
def get_workspace(
105116
self, workspace_id: Optional[str] = None, timeout: float = 30
106117
) -> dict:
107118
workspace_id = workspace_id or self.default_workspace_id
108-
return self.session.get(
109-
self._build_url(f"/workspaces/{workspace_id}"), timeout=timeout
110-
).json()
119+
return self.client.get(f"/workspaces/{workspace_id}", timeout=timeout).json()
111120

112121
def get_workspaces(self, timeout: float = 30) -> List[dict]:
113122
# TODO: This seems only return [] with Personal Access Tokens as of Sept 7 '23
114-
return self.session.get(self._build_url("/workspaces"), timeout=timeout).json()
123+
return self.client.get("/workspaces", timeout=timeout).json()
115124

116125
def get_topic(
117126
self, topic_name: str, workspace_id: Optional[str] = None, timeout: float = 30
118127
) -> dict:
119128
workspace_id = workspace_id or self.default_workspace_id
120-
return self.session.get(
121-
self._build_url(f"/{workspace_id}/topics/{topic_name}"), timeout=timeout
129+
return self.client.get(
130+
f"/{workspace_id}/topics/{topic_name}", timeout=timeout
122131
).json()
123132

124133
def get_topics(
@@ -127,9 +136,7 @@ def get_topics(
127136
timeout: float = 30,
128137
) -> List[dict]:
129138
workspace_id = workspace_id or self.default_workspace_id
130-
return self.session.get(
131-
self._build_url(f"/{workspace_id}/topics"), timeout=timeout
132-
).json()
139+
return self.client.get(f"/{workspace_id}/topics", timeout=timeout).json()
133140

134141
def post_topic(
135142
self,
@@ -153,11 +160,11 @@ def post_topic(
153160
"cleanupPolicy": cleanup_policy,
154161
},
155162
}
156-
return self.session.post(
157-
self._build_url(f"/{workspace_id}/topics"), json=d, timeout=timeout
163+
return self.client.post(
164+
f"/{workspace_id}/topics", json=d, timeout=timeout
158165
).json()
159166

160-
def _response_handler(self, r: requests.Response, *args, **kwargs):
167+
def _response_handler(self, r: httpx.Response, *args, **kwargs):
161168
"""
162169
Custom callback/hook that is called after receiving a request.Response
163170
@@ -167,28 +174,25 @@ def _response_handler(self, r: requests.Response, *args, **kwargs):
167174
"""
168175
try:
169176
r.raise_for_status()
170-
except requests.exceptions.HTTPError as e:
177+
except httpx.HTTPStatusError as e:
178+
content = e.response.read()
179+
171180
try:
172-
error_text = e.response.json()
173-
except requests.exceptions.JSONDecodeError:
181+
error_text = json.loads(content)
182+
except json.JSONDecodeError:
174183
error_text = e.response.text
175184

176185
raise QuixApiRequestFailure(
177186
status_code=e.response.status_code,
178-
url=e.response.url,
187+
url=str(e.response.url),
179188
error_text=error_text,
180-
)
189+
) from e
181190

182-
def _build_url(self, path: str) -> str:
183-
return urljoin(base=self._portal_api_url, url=path)
184-
185-
def _init_session(self, api_version: str, auth_token: str) -> requests.Session:
186-
session = requests.Session()
187-
session.hooks = {"response": self._response_handler}
188-
session.headers.update(
189-
{
190-
"X-Version": api_version,
191-
"Authorization": f"Bearer {auth_token}",
192-
}
193-
)
194-
return session
191+
def __getstate__(self) -> object:
192+
"""
193+
Drops the "_client" attribute to support pickling.
194+
httpx.Client is not pickleable by default.
195+
"""
196+
state = self.__dict__.copy()
197+
state.pop("_client", None)
198+
return state

quixstreams/platforms/quix/config.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
from copy import deepcopy
66
from typing import Any, List, Optional
77

8-
from requests import HTTPError
9-
108
from quixstreams.kafka.configuration import ConnectionConfig
119
from quixstreams.models.topics import Topic, TopicConfig
1210

@@ -266,7 +264,7 @@ def search_for_workspace(
266264
return self._api.get_workspace(
267265
workspace_id=workspace_name_or_id, timeout=timeout
268266
)
269-
except HTTPError:
267+
except QuixApiRequestFailure:
270268
# check to see if they provided the workspace name instead
271269
ws_list = self._api.get_workspaces(timeout=timeout)
272270
for ws in ws_list:

quixstreams/sinks/core/influxdb3.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33
import time
44
from datetime import datetime, timezone
55
from typing import Any, Callable, Iterable, Literal, Mapping, Optional, Union, get_args
6-
from urllib.parse import urljoin
76

8-
import requests
7+
import httpx
98

109
from quixstreams.models import HeadersTuples
1110

@@ -200,8 +199,8 @@ def __init__(
200199
def _get_influx_version(self):
201200
# This validates the token is valid regardless of version
202201
try:
203-
r = requests.get(
204-
urljoin(self._client_args["host"], "ping"),
202+
r = httpx.get(
203+
httpx.URL(host=self._client_args["host"], path="ping"),
205204
headers={"Authorization": f"Token {self._client_args['token']}"},
206205
timeout=self._request_timeout_ms / 1000,
207206
)

requirements-mypy.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ mypy==1.17.1
22
mypy-extensions==1.1.0
33
types-jsonschema==4.25.0.20250720
44
types-protobuf==6.30.2.20250703
5-
types-requests==2.32.4.20250611
5+
types-psycopg2>=2.9,<3

requirements.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
confluent-kafka[avro,json,protobuf,schemaregistry]>=2.8.2,<2.12
2-
requests>=2.32
32
rocksdict>=0.3,<0.4
43
typing_extensions>=4.8
54
orjson>=3.9,<4
@@ -8,4 +7,5 @@ pydantic-settings>=2.3,<2.11
87
jsonschema>=4.3.0
98
jsonlines>=4,<5
109
rich>=13,<15
11-
jsonpath_ng>=1.7.0,<2
10+
jsonpath_ng>=1.7.0,<2
11+
httpx>=0.28.1

tests/requirements.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
testcontainers[postgres]==4.12.0
22
pytest
3-
requests>=2.32
43
docker>=7.1.0 # Required to use requests>=2.32
54
fastavro>=1.8,<2.0
65
protobuf>=5.27.2,<7.0
@@ -9,4 +8,3 @@ pyiceberg[pyarrow,glue]>=0.7
98
redis[hiredis]>=5.2.0,<6
109
pandas>=1.0.0,<3.0
1110
psycopg2-binary>=2.9,<3
12-
types-psycopg2>=2.9,<3

0 commit comments

Comments
 (0)