diff --git a/CHANGELOG.md b/CHANGELOG.md index d7a3a503..2c6efa56 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Features 1. [#657](https://github.com/influxdata/influxdb-client-python/pull/657): Prefer datetime.fromisoformat over dateutil.parse in Python 3.11+ +1. [#658](https://github.com/influxdata/influxdb-client-python/pull/658): Add `find_buckets_iter` function that allow iterate through all pages of buckets. ## 1.43.0 [2024-05-17] diff --git a/examples/buckets_management.py b/examples/buckets_management.py index cc81b58f..c2a24092 100644 --- a/examples/buckets_management.py +++ b/examples/buckets_management.py @@ -36,7 +36,7 @@ List all Buckets """ print(f"\n------- List -------\n") - buckets = buckets_api.find_buckets().buckets + buckets = buckets_api.find_buckets_iter() print("\n".join([f" ---\n ID: {bucket.id}\n Name: {bucket.name}\n Retention: {bucket.retention_rules}" for bucket in buckets])) print("---") diff --git a/influxdb_client/client/_pages.py b/influxdb_client/client/_pages.py new file mode 100644 index 00000000..5e418427 --- /dev/null +++ b/influxdb_client/client/_pages.py @@ -0,0 +1,66 @@ + + +class _Page: + def __init__(self, values, has_next, next_after): + self.has_next = has_next + self.values = values + self.next_after = next_after + + @staticmethod + def empty(): + return _Page([], False, None) + + @staticmethod + def initial(after): + return _Page([], True, after) + + +class _PageIterator: + def __init__(self, page: _Page, get_next_page): + self.page = page + self.get_next_page = get_next_page + + def __iter__(self): + return self + + def __next__(self): + if not self.page.values: + if self.page.has_next: + self.page = self.get_next_page(self.page) + if not self.page.values: + raise StopIteration + return self.page.values.pop(0) + + +class _Paginated: + def __init__(self, paginated_getter, pluck_page_resources_from_response): + self.paginated_getter = paginated_getter + self.pluck_page_resources_from_response = pluck_page_resources_from_response + + def find_iter(self, **kwargs): + """Iterate over resources with pagination. + + :key str org: The organization name. + :key str org_id: The organization ID. + :key str after: The last resource ID from which to seek from (but not including). + :key int limit: the maximum number of items per page + :return: resources iterator + """ + + def get_next_page(page: _Page): + return self._find_next_page(page, **kwargs) + + return iter(_PageIterator(_Page.initial(kwargs.get('after')), get_next_page)) + + def _find_next_page(self, page: _Page, **kwargs): + if not page.has_next: + return _Page.empty() + + kw_args = {**kwargs, 'after': page.next_after} if page.next_after is not None else kwargs + response = self.paginated_getter(**kw_args) + + resources = self.pluck_page_resources_from_response(response) + has_next = response.links.next is not None + last_id = resources[-1].id if resources else None + + return _Page(resources, has_next, last_id) diff --git a/influxdb_client/client/bucket_api.py b/influxdb_client/client/bucket_api.py index 47763bee..684da767 100644 --- a/influxdb_client/client/bucket_api.py +++ b/influxdb_client/client/bucket_api.py @@ -8,6 +8,7 @@ from influxdb_client import BucketsService, Bucket, PostBucketRequest, PatchBucketRequest from influxdb_client.client.util.helpers import get_org_query_param +from influxdb_client.client._pages import _Paginated class BucketsApi(object): @@ -117,3 +118,15 @@ def find_buckets(self, **kwargs): :return: Buckets """ return self._buckets_service.get_buckets(**kwargs) + + def find_buckets_iter(self, **kwargs): + """Iterate over all buckets with pagination. + + :key str name: Only returns buckets with the specified name + :key str org: The organization name. + :key str org_id: The organization ID. + :key str after: The last resource ID from which to seek from (but not including). + :key int limit: the maximum number of buckets in one page + :return: Buckets iterator + """ + return _Paginated(self._buckets_service.get_buckets, lambda response: response.buckets).find_iter(**kwargs) diff --git a/influxdb_client/client/tasks_api.py b/influxdb_client/client/tasks_api.py index 9edb2ec9..5ca18fbd 100644 --- a/influxdb_client/client/tasks_api.py +++ b/influxdb_client/client/tasks_api.py @@ -9,38 +9,7 @@ from influxdb_client import TasksService, Task, TaskCreateRequest, TaskUpdateRequest, LabelResponse, LabelMapping, \ AddResourceMemberRequestBody, RunManually, Run, LogEvent - - -class _Page: - def __init__(self, values, has_next, next_after): - self.has_next = has_next - self.values = values - self.next_after = next_after - - @staticmethod - def empty(): - return _Page([], False, None) - - @staticmethod - def initial(after): - return _Page([], True, after) - - -class _PageIterator: - def __init__(self, page: _Page, get_next_page): - self.page = page - self.get_next_page = get_next_page - - def __iter__(self): - return self - - def __next__(self): - if not self.page.values: - if self.page.has_next: - self.page = self.get_next_page(self.page) - if not self.page.values: - raise StopIteration - return self.page.values.pop(0) +from influxdb_client.client._pages import _Paginated class TasksApi(object): @@ -80,11 +49,7 @@ def find_tasks_iter(self, **kwargs): :key int limit: the number of tasks in one page :return: Tasks iterator """ - - def get_next_page(page: _Page): - return self._find_tasks_next_page(page, **kwargs) - - return iter(_PageIterator(_Page.initial(kwargs.get('after')), get_next_page)) + return _Paginated(self._service.get_tasks, lambda response: response.tasks).find_iter(**kwargs) def create_task(self, task: Task = None, task_create_request: TaskCreateRequest = None) -> Task: """Create a new task.""" @@ -259,16 +224,3 @@ def get_logs(self, task_id: str) -> List['LogEvent']: def find_tasks_by_user(self, task_user_id): """List all tasks by user.""" return self.find_tasks(user=task_user_id) - - def _find_tasks_next_page(self, page: _Page, **kwargs): - if not page.has_next: - return _Page.empty() - - args = {**kwargs, 'after': page.next_after} if page.next_after is not None else kwargs - tasks_response = self._service.get_tasks(**args) - - tasks = tasks_response.tasks - has_next = tasks_response.links.next is not None - last_id = tasks[-1].id if tasks else None - - return _Page(tasks, has_next, last_id) diff --git a/tests/test_BucketsApi.py b/tests/test_BucketsApi.py index db7e28d1..58bbd280 100644 --- a/tests/test_BucketsApi.py +++ b/tests/test_BucketsApi.py @@ -83,26 +83,65 @@ def test_create_bucket_retention_list(self): self.delete_test_bucket(my_bucket) - def test_pagination(self): + def test_find_buckets(self): my_org = self.find_my_org() - buckets = self.buckets_api.find_buckets().buckets + buckets = self.buckets_api.find_buckets(limit=100).buckets size = len(buckets) # create 2 buckets self.buckets_api.create_bucket(bucket_name=generate_bucket_name(), org=my_org) self.buckets_api.create_bucket(bucket_name=generate_bucket_name(), org=my_org) - buckets = self.buckets_api.find_buckets().buckets + buckets = self.buckets_api.find_buckets(limit=size + 2).buckets self.assertEqual(size + 2, len(buckets)) # offset 1 - buckets = self.buckets_api.find_buckets(offset=1).buckets + buckets = self.buckets_api.find_buckets(offset=1, limit=size + 2).buckets self.assertEqual(size + 1, len(buckets)) # count 1 buckets = self.buckets_api.find_buckets(limit=1).buckets self.assertEqual(1, len(buckets)) + def test_find_buckets_iter(self): + def count_unique_ids(items): + return len(set(map(lambda item: item.id, items))) + + my_org = self.find_my_org() + more_buckets = 10 + num_of_buckets = count_unique_ids(self.buckets_api.find_buckets_iter()) + more_buckets + + a_bucket_name = None + for _ in range(more_buckets): + bucket_name = self.generate_name("it find_buckets_iter") + self.buckets_api.create_bucket(bucket_name=bucket_name, org=my_org) + a_bucket_name = bucket_name + + # get no buckets + buckets = self.buckets_api.find_buckets_iter(name=a_bucket_name + "blah") + self.assertEqual(count_unique_ids(buckets), 0) + + # get bucket by name + buckets = self.buckets_api.find_buckets_iter(name=a_bucket_name) + self.assertEqual(count_unique_ids(buckets), 1) + + # get buckets in 3-4 batches + buckets = self.buckets_api.find_buckets_iter(limit=num_of_buckets // 3) + self.assertEqual(count_unique_ids(buckets), num_of_buckets) + + # get buckets in one batch + buckets = self.buckets_api.find_buckets_iter(limit=num_of_buckets) + self.assertEqual(count_unique_ids(buckets), num_of_buckets) + + # get buckets in one batch, requesting too much + buckets = self.buckets_api.find_buckets_iter(limit=num_of_buckets + 1) + self.assertEqual(count_unique_ids(buckets), num_of_buckets) + + # skip some buckets + *_, skip_bucket = self.buckets_api.find_buckets(limit=num_of_buckets // 3).buckets + buckets = self.buckets_api.find_buckets_iter(after=skip_bucket.id) + self.assertEqual(count_unique_ids(buckets), num_of_buckets - num_of_buckets // 3) + def test_update_bucket(self): my_org = self.find_my_org()