Skip to content

Commit 6798be4

Browse files
mbenabdabednar
andauthoredMay 28, 2024··
feat: bucket pagination (#658)
* feat: support buckets pagination * docs: update example * docs: add changelog entry * Update CHANGELOG.md Co-authored-by: Jakub Bednář <jakub.bednar@gmail.com> * encapsulate pagination --------- Co-authored-by: Mehdi BEN ABDALLAH <@mbenabda> Co-authored-by: Jakub Bednář <jakub.bednar@gmail.com>
1 parent 73849e7 commit 6798be4

File tree

6 files changed

+126
-55
lines changed

6 files changed

+126
-55
lines changed
 

‎CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
### Features
44
1. [#657](https://github.com/influxdata/influxdb-client-python/pull/657): Prefer datetime.fromisoformat over dateutil.parse in Python 3.11+
5+
1. [#658](https://github.com/influxdata/influxdb-client-python/pull/658): Add `find_buckets_iter` function that allow iterate through all pages of buckets.
56

67
## 1.43.0 [2024-05-17]
78

‎examples/buckets_management.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
List all Buckets
3737
"""
3838
print(f"\n------- List -------\n")
39-
buckets = buckets_api.find_buckets().buckets
39+
buckets = buckets_api.find_buckets_iter()
4040
print("\n".join([f" ---\n ID: {bucket.id}\n Name: {bucket.name}\n Retention: {bucket.retention_rules}"
4141
for bucket in buckets]))
4242
print("---")

‎influxdb_client/client/_pages.py

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
2+
3+
class _Page:
4+
def __init__(self, values, has_next, next_after):
5+
self.has_next = has_next
6+
self.values = values
7+
self.next_after = next_after
8+
9+
@staticmethod
10+
def empty():
11+
return _Page([], False, None)
12+
13+
@staticmethod
14+
def initial(after):
15+
return _Page([], True, after)
16+
17+
18+
class _PageIterator:
19+
def __init__(self, page: _Page, get_next_page):
20+
self.page = page
21+
self.get_next_page = get_next_page
22+
23+
def __iter__(self):
24+
return self
25+
26+
def __next__(self):
27+
if not self.page.values:
28+
if self.page.has_next:
29+
self.page = self.get_next_page(self.page)
30+
if not self.page.values:
31+
raise StopIteration
32+
return self.page.values.pop(0)
33+
34+
35+
class _Paginated:
36+
def __init__(self, paginated_getter, pluck_page_resources_from_response):
37+
self.paginated_getter = paginated_getter
38+
self.pluck_page_resources_from_response = pluck_page_resources_from_response
39+
40+
def find_iter(self, **kwargs):
41+
"""Iterate over resources with pagination.
42+
43+
:key str org: The organization name.
44+
:key str org_id: The organization ID.
45+
:key str after: The last resource ID from which to seek from (but not including).
46+
:key int limit: the maximum number of items per page
47+
:return: resources iterator
48+
"""
49+
50+
def get_next_page(page: _Page):
51+
return self._find_next_page(page, **kwargs)
52+
53+
return iter(_PageIterator(_Page.initial(kwargs.get('after')), get_next_page))
54+
55+
def _find_next_page(self, page: _Page, **kwargs):
56+
if not page.has_next:
57+
return _Page.empty()
58+
59+
kw_args = {**kwargs, 'after': page.next_after} if page.next_after is not None else kwargs
60+
response = self.paginated_getter(**kw_args)
61+
62+
resources = self.pluck_page_resources_from_response(response)
63+
has_next = response.links.next is not None
64+
last_id = resources[-1].id if resources else None
65+
66+
return _Page(resources, has_next, last_id)

‎influxdb_client/client/bucket_api.py

+13
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from influxdb_client import BucketsService, Bucket, PostBucketRequest, PatchBucketRequest
1010
from influxdb_client.client.util.helpers import get_org_query_param
11+
from influxdb_client.client._pages import _Paginated
1112

1213

1314
class BucketsApi(object):
@@ -117,3 +118,15 @@ def find_buckets(self, **kwargs):
117118
:return: Buckets
118119
"""
119120
return self._buckets_service.get_buckets(**kwargs)
121+
122+
def find_buckets_iter(self, **kwargs):
123+
"""Iterate over all buckets with pagination.
124+
125+
:key str name: Only returns buckets with the specified name
126+
:key str org: The organization name.
127+
:key str org_id: The organization ID.
128+
:key str after: The last resource ID from which to seek from (but not including).
129+
:key int limit: the maximum number of buckets in one page
130+
:return: Buckets iterator
131+
"""
132+
return _Paginated(self._buckets_service.get_buckets, lambda response: response.buckets).find_iter(**kwargs)

‎influxdb_client/client/tasks_api.py

+2-50
Original file line numberDiff line numberDiff line change
@@ -9,38 +9,7 @@
99

1010
from influxdb_client import TasksService, Task, TaskCreateRequest, TaskUpdateRequest, LabelResponse, LabelMapping, \
1111
AddResourceMemberRequestBody, RunManually, Run, LogEvent
12-
13-
14-
class _Page:
15-
def __init__(self, values, has_next, next_after):
16-
self.has_next = has_next
17-
self.values = values
18-
self.next_after = next_after
19-
20-
@staticmethod
21-
def empty():
22-
return _Page([], False, None)
23-
24-
@staticmethod
25-
def initial(after):
26-
return _Page([], True, after)
27-
28-
29-
class _PageIterator:
30-
def __init__(self, page: _Page, get_next_page):
31-
self.page = page
32-
self.get_next_page = get_next_page
33-
34-
def __iter__(self):
35-
return self
36-
37-
def __next__(self):
38-
if not self.page.values:
39-
if self.page.has_next:
40-
self.page = self.get_next_page(self.page)
41-
if not self.page.values:
42-
raise StopIteration
43-
return self.page.values.pop(0)
12+
from influxdb_client.client._pages import _Paginated
4413

4514

4615
class TasksApi(object):
@@ -80,11 +49,7 @@ def find_tasks_iter(self, **kwargs):
8049
:key int limit: the number of tasks in one page
8150
:return: Tasks iterator
8251
"""
83-
84-
def get_next_page(page: _Page):
85-
return self._find_tasks_next_page(page, **kwargs)
86-
87-
return iter(_PageIterator(_Page.initial(kwargs.get('after')), get_next_page))
52+
return _Paginated(self._service.get_tasks, lambda response: response.tasks).find_iter(**kwargs)
8853

8954
def create_task(self, task: Task = None, task_create_request: TaskCreateRequest = None) -> Task:
9055
"""Create a new task."""
@@ -259,16 +224,3 @@ def get_logs(self, task_id: str) -> List['LogEvent']:
259224
def find_tasks_by_user(self, task_user_id):
260225
"""List all tasks by user."""
261226
return self.find_tasks(user=task_user_id)
262-
263-
def _find_tasks_next_page(self, page: _Page, **kwargs):
264-
if not page.has_next:
265-
return _Page.empty()
266-
267-
args = {**kwargs, 'after': page.next_after} if page.next_after is not None else kwargs
268-
tasks_response = self._service.get_tasks(**args)
269-
270-
tasks = tasks_response.tasks
271-
has_next = tasks_response.links.next is not None
272-
last_id = tasks[-1].id if tasks else None
273-
274-
return _Page(tasks, has_next, last_id)

‎tests/test_BucketsApi.py

+43-4
Original file line numberDiff line numberDiff line change
@@ -83,26 +83,65 @@ def test_create_bucket_retention_list(self):
8383

8484
self.delete_test_bucket(my_bucket)
8585

86-
def test_pagination(self):
86+
def test_find_buckets(self):
8787
my_org = self.find_my_org()
88-
buckets = self.buckets_api.find_buckets().buckets
88+
buckets = self.buckets_api.find_buckets(limit=100).buckets
8989
size = len(buckets)
9090

9191
# create 2 buckets
9292
self.buckets_api.create_bucket(bucket_name=generate_bucket_name(), org=my_org)
9393
self.buckets_api.create_bucket(bucket_name=generate_bucket_name(), org=my_org)
9494

95-
buckets = self.buckets_api.find_buckets().buckets
95+
buckets = self.buckets_api.find_buckets(limit=size + 2).buckets
9696
self.assertEqual(size + 2, len(buckets))
9797

9898
# offset 1
99-
buckets = self.buckets_api.find_buckets(offset=1).buckets
99+
buckets = self.buckets_api.find_buckets(offset=1, limit=size + 2).buckets
100100
self.assertEqual(size + 1, len(buckets))
101101

102102
# count 1
103103
buckets = self.buckets_api.find_buckets(limit=1).buckets
104104
self.assertEqual(1, len(buckets))
105105

106+
def test_find_buckets_iter(self):
107+
def count_unique_ids(items):
108+
return len(set(map(lambda item: item.id, items)))
109+
110+
my_org = self.find_my_org()
111+
more_buckets = 10
112+
num_of_buckets = count_unique_ids(self.buckets_api.find_buckets_iter()) + more_buckets
113+
114+
a_bucket_name = None
115+
for _ in range(more_buckets):
116+
bucket_name = self.generate_name("it find_buckets_iter")
117+
self.buckets_api.create_bucket(bucket_name=bucket_name, org=my_org)
118+
a_bucket_name = bucket_name
119+
120+
# get no buckets
121+
buckets = self.buckets_api.find_buckets_iter(name=a_bucket_name + "blah")
122+
self.assertEqual(count_unique_ids(buckets), 0)
123+
124+
# get bucket by name
125+
buckets = self.buckets_api.find_buckets_iter(name=a_bucket_name)
126+
self.assertEqual(count_unique_ids(buckets), 1)
127+
128+
# get buckets in 3-4 batches
129+
buckets = self.buckets_api.find_buckets_iter(limit=num_of_buckets // 3)
130+
self.assertEqual(count_unique_ids(buckets), num_of_buckets)
131+
132+
# get buckets in one batch
133+
buckets = self.buckets_api.find_buckets_iter(limit=num_of_buckets)
134+
self.assertEqual(count_unique_ids(buckets), num_of_buckets)
135+
136+
# get buckets in one batch, requesting too much
137+
buckets = self.buckets_api.find_buckets_iter(limit=num_of_buckets + 1)
138+
self.assertEqual(count_unique_ids(buckets), num_of_buckets)
139+
140+
# skip some buckets
141+
*_, skip_bucket = self.buckets_api.find_buckets(limit=num_of_buckets // 3).buckets
142+
buckets = self.buckets_api.find_buckets_iter(after=skip_bucket.id)
143+
self.assertEqual(count_unique_ids(buckets), num_of_buckets - num_of_buckets // 3)
144+
106145
def test_update_bucket(self):
107146
my_org = self.find_my_org()
108147

0 commit comments

Comments
 (0)
Please sign in to comment.