Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bucket pagination #658

Merged
merged 5 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Features
1. [#657](https://github.com/influxdata/influxdb-client-python/pull/657): Prefer datetime.fromisoformat over dateutil.parse in Python 3.11+
1. [#658](https://github.com/influxdata/influxdb-client-python/pull/658): Add `find_buckets_iter` function that allow iterate through all pages of buckets.

## 1.43.0 [2024-05-17]

Expand Down
2 changes: 1 addition & 1 deletion examples/buckets_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
List all Buckets
"""
print(f"\n------- List -------\n")
buckets = buckets_api.find_buckets().buckets
buckets = buckets_api.find_buckets_iter()
print("\n".join([f" ---\n ID: {bucket.id}\n Name: {bucket.name}\n Retention: {bucket.retention_rules}"
for bucket in buckets]))
print("---")
Expand Down
66 changes: 66 additions & 0 deletions influxdb_client/client/_pages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@


class _Page:
def __init__(self, values, has_next, next_after):
self.has_next = has_next
self.values = values
self.next_after = next_after

@staticmethod
def empty():
return _Page([], False, None)

@staticmethod
def initial(after):
return _Page([], True, after)


class _PageIterator:
def __init__(self, page: _Page, get_next_page):
self.page = page
self.get_next_page = get_next_page

def __iter__(self):
return self

def __next__(self):
if not self.page.values:
if self.page.has_next:
self.page = self.get_next_page(self.page)
if not self.page.values:
raise StopIteration
return self.page.values.pop(0)


class _Paginated:
def __init__(self, paginated_getter, pluck_page_resources_from_response):
self.paginated_getter = paginated_getter
self.pluck_page_resources_from_response = pluck_page_resources_from_response

def find_iter(self, **kwargs):
"""Iterate over resources with pagination.

:key str org: The organization name.
:key str org_id: The organization ID.
:key str after: The last resource ID from which to seek from (but not including).
:key int limit: the maximum number of items per page
:return: resources iterator
"""

def get_next_page(page: _Page):
return self._find_next_page(page, **kwargs)

return iter(_PageIterator(_Page.initial(kwargs.get('after')), get_next_page))

def _find_next_page(self, page: _Page, **kwargs):
if not page.has_next:
return _Page.empty()

kw_args = {**kwargs, 'after': page.next_after} if page.next_after is not None else kwargs
response = self.paginated_getter(**kw_args)

resources = self.pluck_page_resources_from_response(response)
has_next = response.links.next is not None
last_id = resources[-1].id if resources else None

return _Page(resources, has_next, last_id)
13 changes: 13 additions & 0 deletions influxdb_client/client/bucket_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from influxdb_client import BucketsService, Bucket, PostBucketRequest, PatchBucketRequest
from influxdb_client.client.util.helpers import get_org_query_param
from influxdb_client.client._pages import _Paginated


class BucketsApi(object):
Expand Down Expand Up @@ -117,3 +118,15 @@ def find_buckets(self, **kwargs):
:return: Buckets
"""
return self._buckets_service.get_buckets(**kwargs)

def find_buckets_iter(self, **kwargs):
"""Iterate over all buckets with pagination.

:key str name: Only returns buckets with the specified name
:key str org: The organization name.
:key str org_id: The organization ID.
:key str after: The last resource ID from which to seek from (but not including).
:key int limit: the maximum number of buckets in one page
:return: Buckets iterator
"""
return _Paginated(self._buckets_service.get_buckets, lambda response: response.buckets).find_iter(**kwargs)
52 changes: 2 additions & 50 deletions influxdb_client/client/tasks_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,7 @@

from influxdb_client import TasksService, Task, TaskCreateRequest, TaskUpdateRequest, LabelResponse, LabelMapping, \
AddResourceMemberRequestBody, RunManually, Run, LogEvent


class _Page:
def __init__(self, values, has_next, next_after):
self.has_next = has_next
self.values = values
self.next_after = next_after

@staticmethod
def empty():
return _Page([], False, None)

@staticmethod
def initial(after):
return _Page([], True, after)


class _PageIterator:
def __init__(self, page: _Page, get_next_page):
self.page = page
self.get_next_page = get_next_page

def __iter__(self):
return self

def __next__(self):
if not self.page.values:
if self.page.has_next:
self.page = self.get_next_page(self.page)
if not self.page.values:
raise StopIteration
return self.page.values.pop(0)
from influxdb_client.client._pages import _Paginated


class TasksApi(object):
Expand Down Expand Up @@ -80,11 +49,7 @@ def find_tasks_iter(self, **kwargs):
:key int limit: the number of tasks in one page
:return: Tasks iterator
"""

def get_next_page(page: _Page):
return self._find_tasks_next_page(page, **kwargs)

return iter(_PageIterator(_Page.initial(kwargs.get('after')), get_next_page))
return _Paginated(self._service.get_tasks, lambda response: response.tasks).find_iter(**kwargs)

def create_task(self, task: Task = None, task_create_request: TaskCreateRequest = None) -> Task:
"""Create a new task."""
Expand Down Expand Up @@ -259,16 +224,3 @@ def get_logs(self, task_id: str) -> List['LogEvent']:
def find_tasks_by_user(self, task_user_id):
"""List all tasks by user."""
return self.find_tasks(user=task_user_id)

def _find_tasks_next_page(self, page: _Page, **kwargs):
if not page.has_next:
return _Page.empty()

args = {**kwargs, 'after': page.next_after} if page.next_after is not None else kwargs
tasks_response = self._service.get_tasks(**args)

tasks = tasks_response.tasks
has_next = tasks_response.links.next is not None
last_id = tasks[-1].id if tasks else None

return _Page(tasks, has_next, last_id)
47 changes: 43 additions & 4 deletions tests/test_BucketsApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,26 +83,65 @@ def test_create_bucket_retention_list(self):

self.delete_test_bucket(my_bucket)

def test_pagination(self):
def test_find_buckets(self):
my_org = self.find_my_org()
buckets = self.buckets_api.find_buckets().buckets
buckets = self.buckets_api.find_buckets(limit=100).buckets
size = len(buckets)

# create 2 buckets
self.buckets_api.create_bucket(bucket_name=generate_bucket_name(), org=my_org)
self.buckets_api.create_bucket(bucket_name=generate_bucket_name(), org=my_org)

buckets = self.buckets_api.find_buckets().buckets
buckets = self.buckets_api.find_buckets(limit=size + 2).buckets
self.assertEqual(size + 2, len(buckets))

# offset 1
buckets = self.buckets_api.find_buckets(offset=1).buckets
buckets = self.buckets_api.find_buckets(offset=1, limit=size + 2).buckets
self.assertEqual(size + 1, len(buckets))

# count 1
buckets = self.buckets_api.find_buckets(limit=1).buckets
self.assertEqual(1, len(buckets))

def test_find_buckets_iter(self):
def count_unique_ids(items):
return len(set(map(lambda item: item.id, items)))

my_org = self.find_my_org()
more_buckets = 10
num_of_buckets = count_unique_ids(self.buckets_api.find_buckets_iter()) + more_buckets

a_bucket_name = None
for _ in range(more_buckets):
bucket_name = self.generate_name("it find_buckets_iter")
self.buckets_api.create_bucket(bucket_name=bucket_name, org=my_org)
a_bucket_name = bucket_name

# get no buckets
buckets = self.buckets_api.find_buckets_iter(name=a_bucket_name + "blah")
self.assertEqual(count_unique_ids(buckets), 0)

# get bucket by name
buckets = self.buckets_api.find_buckets_iter(name=a_bucket_name)
self.assertEqual(count_unique_ids(buckets), 1)

# get buckets in 3-4 batches
buckets = self.buckets_api.find_buckets_iter(limit=num_of_buckets // 3)
self.assertEqual(count_unique_ids(buckets), num_of_buckets)

# get buckets in one batch
buckets = self.buckets_api.find_buckets_iter(limit=num_of_buckets)
self.assertEqual(count_unique_ids(buckets), num_of_buckets)

# get buckets in one batch, requesting too much
buckets = self.buckets_api.find_buckets_iter(limit=num_of_buckets + 1)
self.assertEqual(count_unique_ids(buckets), num_of_buckets)

# skip some buckets
*_, skip_bucket = self.buckets_api.find_buckets(limit=num_of_buckets // 3).buckets
buckets = self.buckets_api.find_buckets_iter(after=skip_bucket.id)
self.assertEqual(count_unique_ids(buckets), num_of_buckets - num_of_buckets // 3)

def test_update_bucket(self):
my_org = self.find_my_org()

Expand Down