Skip to content

Commit 0c60b5d

Browse files
authoredNov 29, 2023
feat: task pagination (#616)
1 parent 2f9e5ed commit 0c60b5d

File tree

4 files changed

+103
-1
lines changed

4 files changed

+103
-1
lines changed
 

‎CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
## 1.39.0 [unreleased]
22

3+
### Features
4+
1. [#616](https://github.com/influxdata/influxdb-client-python/pull/616): Add `find_tasks_iter` function that allow iterate through all pages of tasks.
5+
36
## 1.38.0 [2023-10-02]
47

58
### Bug Fixes

‎examples/task_example.py

+6
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,9 @@
2525
task_request = TaskCreateRequest(flux=flux, org=org, description="Task Description", status="active")
2626
task = tasks_api.create_task(task_create_request=task_request)
2727
print(task)
28+
29+
tasks = tasks_api.find_tasks_iter()
30+
31+
# print all tasks id
32+
for task in tasks:
33+
print(task.id)

‎influxdb_client/client/tasks_api.py

+63-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,38 @@
1111
AddResourceMemberRequestBody, RunManually, Run, LogEvent
1212

1313

14+
class _Page:
15+
def __init__(self, values, has_next, next_after):
16+
self.has_next = has_next
17+
self.values = values
18+
self.next_after = next_after
19+
20+
@staticmethod
21+
def empty():
22+
return _Page([], False, None)
23+
24+
@staticmethod
25+
def initial(after):
26+
return _Page([], True, after)
27+
28+
29+
class _PageIterator:
30+
def __init__(self, page: _Page, get_next_page):
31+
self.page = page
32+
self.get_next_page = get_next_page
33+
34+
def __iter__(self):
35+
return self
36+
37+
def __next__(self):
38+
if not self.page.values:
39+
if self.page.has_next:
40+
self.page = self.get_next_page(self.page)
41+
if not self.page.values:
42+
raise StopIteration
43+
return self.page.values.pop(0)
44+
45+
1446
class TasksApi(object):
1547
"""Implementation for '/api/v2/tasks' endpoint."""
1648

@@ -25,7 +57,7 @@ def find_task_by_id(self, task_id) -> Task:
2557
return task
2658

2759
def find_tasks(self, **kwargs):
28-
"""List all tasks.
60+
"""List all tasks up to set limit (max 500).
2961
3062
:key str name: only returns tasks with the specified name
3163
:key str after: returns tasks after specified ID
@@ -37,6 +69,23 @@ def find_tasks(self, **kwargs):
3769
"""
3870
return self._service.get_tasks(**kwargs).tasks
3971

72+
def find_tasks_iter(self, **kwargs):
73+
"""Iterate over all tasks with pagination.
74+
75+
:key str name: only returns tasks with the specified name
76+
:key str after: returns tasks after specified ID
77+
:key str user: filter tasks to a specific user ID
78+
:key str org: filter tasks to a specific organization name
79+
:key str org_id: filter tasks to a specific organization ID
80+
:key int limit: the number of tasks in one page
81+
:return: Tasks iterator
82+
"""
83+
84+
def get_next_page(page: _Page):
85+
return self._find_tasks_next_page(page, **kwargs)
86+
87+
return iter(_PageIterator(_Page.initial(kwargs.get('after')), get_next_page))
88+
4089
def create_task(self, task: Task = None, task_create_request: TaskCreateRequest = None) -> Task:
4190
"""Create a new task."""
4291
if task_create_request is not None:
@@ -210,3 +259,16 @@ def get_logs(self, task_id: str) -> List['LogEvent']:
210259
def find_tasks_by_user(self, task_user_id):
211260
"""List all tasks by user."""
212261
return self.find_tasks(user=task_user_id)
262+
263+
def _find_tasks_next_page(self, page: _Page, **kwargs):
264+
if not page.has_next:
265+
return _Page.empty()
266+
267+
args = {**kwargs, 'after': page.next_after} if page.next_after is not None else kwargs
268+
tasks_response = self._service.get_tasks(**args)
269+
270+
tasks = tasks_response.tasks
271+
has_next = tasks_response.links.next is not None
272+
last_id = tasks[-1].id if tasks else None
273+
274+
return _Page(tasks, has_next, last_id)

‎tests/test_TasksApi.py

+31
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,37 @@ def test_find_task_by_user_id(self):
184184
print(tasks)
185185
self.assertEqual(len(tasks), 1)
186186

187+
def test_find_tasks_iter(self):
188+
task_name = self.generate_name("it task")
189+
num_of_tasks = 10
190+
191+
for _ in range(num_of_tasks):
192+
self.tasks_api.create_task_cron(task_name, TASK_FLUX, "0 2 * * *", self.organization.id)
193+
194+
def count_unique_ids(tasks):
195+
return len(set(map(lambda task: task.id, tasks)))
196+
197+
# get tasks in 3-4 batches
198+
tasks = self.tasks_api.find_tasks_iter(name= task_name, limit= num_of_tasks // 3)
199+
self.assertEqual(count_unique_ids(tasks), num_of_tasks)
200+
201+
# get tasks in one equaly size batch
202+
tasks = self.tasks_api.find_tasks_iter(name= task_name, limit= num_of_tasks)
203+
self.assertEqual(count_unique_ids(tasks), num_of_tasks)
204+
205+
# get tasks in one batch
206+
tasks = self.tasks_api.find_tasks_iter(name= task_name, limit= num_of_tasks + 1)
207+
self.assertEqual(count_unique_ids(tasks), num_of_tasks)
208+
209+
# get no tasks
210+
tasks = self.tasks_api.find_tasks_iter(name= task_name + "blah")
211+
self.assertEqual(count_unique_ids(tasks), 0)
212+
213+
# skip some tasks
214+
*_, split_task = self.tasks_api.find_tasks(name= task_name, limit= num_of_tasks // 3)
215+
tasks = self.tasks_api.find_tasks_iter(name= task_name, limit= 3, after= split_task.id)
216+
self.assertEqual(count_unique_ids(tasks), num_of_tasks - num_of_tasks // 3)
217+
187218
def test_delete_task(self):
188219
task = self.tasks_api.create_task_cron(self.generate_name("it_task"), TASK_FLUX, "0 2 * * *",
189220
self.organization.id)

0 commit comments

Comments
 (0)
Please sign in to comment.