-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathtask_collection.py
More file actions
328 lines (284 loc) · 13.4 KB
/
task_collection.py
File metadata and controls
328 lines (284 loc) · 13.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from apify_client._docs import docs_group
from apify_client._models import (
ActorStandby,
CreateTaskRequest,
ListOfTasks,
ListOfTasksResponse,
Task,
TaskInput,
TaskOptions,
TaskResponse,
)
from apify_client._pagination import get_items_iterator, get_items_iterator_async
from apify_client._resource_clients._resource_client import ResourceClient, ResourceClientAsync
from apify_client._utils import to_seconds
if TYPE_CHECKING:
from collections.abc import AsyncIterator, Iterator
from datetime import timedelta
from apify_client._models import TaskShort
from apify_client._typeddicts import TaskInputDict
from apify_client.types import Timeout
@docs_group('Resource clients')
class TaskCollectionClient(ResourceClient):
"""Sub-client for the task collection.
Provides methods to manage the task collection, e.g. list or create tasks. Obtain an instance via an appropriate
method on the `ApifyClient` class.
"""
def __init__(
self,
*,
resource_path: str = 'actor-tasks',
**kwargs: Any,
) -> None:
super().__init__(
resource_path=resource_path,
**kwargs,
)
def list(
self,
*,
limit: int | None = None,
offset: int | None = None,
desc: bool | None = None,
timeout: Timeout = 'medium',
) -> ListOfTasks:
"""List the available tasks.
https://docs.apify.com/api/v2#/reference/actor-tasks/task-collection/get-list-of-tasks
Args:
limit: How many tasks to list.
offset: What task to include as first when retrieving the list.
desc: Whether to sort the tasks in descending order based on their creation date.
timeout: Timeout for the API HTTP request.
Returns:
The list of available tasks matching the specified filters.
"""
result = self._list(timeout=timeout, limit=limit, offset=offset, desc=desc)
return ListOfTasksResponse.model_validate(result).data
def iterate(
self,
*,
limit: int | None = None,
offset: int | None = None,
desc: bool | None = None,
timeout: Timeout = 'medium',
) -> Iterator[TaskShort]:
"""Iterate over the available tasks.
Simple `list` does only one API call, possibly not listing all items matching the criteria. This method
returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria.
https://docs.apify.com/api/v2#/reference/actor-tasks/task-collection/get-list-of-tasks
Args:
limit: How many tasks to list.
offset: What task to include as first when retrieving the list.
desc: Whether to sort the tasks in descending order based on their creation date.
timeout: Timeout for the API HTTP request.
Yields:
The available tasks matching the specified filters.
"""
def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfTasks:
return self.list(limit=limit, offset=offset, desc=desc, timeout=timeout)
return get_items_iterator(_callback, limit=limit, offset=offset)
def create(
self,
*,
actor_id: str,
name: str,
build: str | None = None,
run_timeout: timedelta | None = None,
memory_mbytes: int | None = None,
max_items: int | None = None,
restart_on_error: bool | None = None,
task_input: TaskInputDict | TaskInput | None = None,
title: str | None = None,
actor_standby_desired_requests_per_actor_run: int | None = None,
actor_standby_max_requests_per_actor_run: int | None = None,
actor_standby_idle_timeout: timedelta | None = None,
actor_standby_build: str | None = None,
actor_standby_memory_mbytes: int | None = None,
timeout: Timeout = 'medium',
) -> Task:
"""Create a new task.
https://docs.apify.com/api/v2#/reference/actor-tasks/task-collection/create-task
Args:
actor_id: Id of the Actor that should be run.
name: Name of the task.
build: Actor build to run. It can be either a build tag or build number. By default, the run uses
the build specified in the task settings (typically latest).
memory_mbytes: Memory limit for the run, in megabytes. By default, the run uses a memory limit specified
in the task settings.
max_items: Maximum number of results that will be returned by runs of this task. If the Actor of this task
is charged per result, you will not be charged for more results than the given limit.
run_timeout: Optional timeout for the run. By default, the run uses timeout specified
in the task settings.
restart_on_error: If true, the Task run process will be restarted whenever it exits with
a non-zero status code.
task_input: Task input object.
title: A human-friendly equivalent of the name.
actor_standby_desired_requests_per_actor_run: The desired number of concurrent HTTP requests for
a single Actor Standby run.
actor_standby_max_requests_per_actor_run: The maximum number of concurrent HTTP requests for
a single Actor Standby run.
actor_standby_idle_timeout: If the Actor run does not receive any requests for this time,
it will be shut down.
actor_standby_build: The build tag or number to run when the Actor is in Standby mode.
actor_standby_memory_mbytes: The memory in megabytes to use when the Actor is in Standby mode.
timeout: Timeout for the API HTTP request.
Returns:
The created task.
"""
if task_input is not None and not isinstance(task_input, TaskInput):
task_input = TaskInput.model_validate(task_input)
task_fields = CreateTaskRequest(
act_id=actor_id,
name=name,
title=title,
input=task_input,
options=TaskOptions(
build=build,
max_items=max_items,
memory_mbytes=memory_mbytes,
timeout_secs=to_seconds(run_timeout, as_int=True),
restart_on_error=restart_on_error,
),
actor_standby=ActorStandby(
desired_requests_per_actor_run=actor_standby_desired_requests_per_actor_run,
max_requests_per_actor_run=actor_standby_max_requests_per_actor_run,
idle_timeout_secs=to_seconds(actor_standby_idle_timeout, as_int=True),
build=actor_standby_build,
memory_mbytes=actor_standby_memory_mbytes,
),
)
result = self._create(timeout=timeout, **task_fields.model_dump(by_alias=True, exclude_none=True))
return TaskResponse.model_validate(result).data
@docs_group('Resource clients')
class TaskCollectionClientAsync(ResourceClientAsync):
"""Sub-client for the task collection.
Provides methods to manage the task collection, e.g. list or create tasks. Obtain an instance via an appropriate
method on the `ApifyClientAsync` class.
"""
def __init__(
self,
*,
resource_path: str = 'actor-tasks',
**kwargs: Any,
) -> None:
super().__init__(
resource_path=resource_path,
**kwargs,
)
async def list(
self,
*,
limit: int | None = None,
offset: int | None = None,
desc: bool | None = None,
timeout: Timeout = 'medium',
) -> ListOfTasks:
"""List the available tasks.
https://docs.apify.com/api/v2#/reference/actor-tasks/task-collection/get-list-of-tasks
Args:
limit: How many tasks to list.
offset: What task to include as first when retrieving the list.
desc: Whether to sort the tasks in descending order based on their creation date.
timeout: Timeout for the API HTTP request.
Returns:
The list of available tasks matching the specified filters.
"""
result = await self._list(timeout=timeout, limit=limit, offset=offset, desc=desc)
return ListOfTasksResponse.model_validate(result).data
def iterate(
self,
*,
limit: int | None = None,
offset: int | None = None,
desc: bool | None = None,
timeout: Timeout = 'medium',
) -> AsyncIterator[TaskShort]:
"""Iterate over the available tasks.
Simple `list` does only one API call, possibly not listing all items matching the criteria. This method
returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria.
https://docs.apify.com/api/v2#/reference/actor-tasks/task-collection/get-list-of-tasks
Args:
limit: How many tasks to list.
offset: What task to include as first when retrieving the list.
desc: Whether to sort the tasks in descending order based on their creation date.
timeout: Timeout for the API HTTP request.
Yields:
The available tasks matching the specified filters.
"""
async def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfTasks:
return await self.list(limit=limit, offset=offset, desc=desc, timeout=timeout)
return get_items_iterator_async(_callback, limit=limit, offset=offset)
async def create(
self,
*,
actor_id: str,
name: str,
build: str | None = None,
run_timeout: timedelta | None = None,
memory_mbytes: int | None = None,
max_items: int | None = None,
restart_on_error: bool | None = None,
task_input: TaskInputDict | TaskInput | None = None,
title: str | None = None,
actor_standby_desired_requests_per_actor_run: int | None = None,
actor_standby_max_requests_per_actor_run: int | None = None,
actor_standby_idle_timeout: timedelta | None = None,
actor_standby_build: str | None = None,
actor_standby_memory_mbytes: int | None = None,
timeout: Timeout = 'medium',
) -> Task:
"""Create a new task.
https://docs.apify.com/api/v2#/reference/actor-tasks/task-collection/create-task
Args:
actor_id: Id of the Actor that should be run.
name: Name of the task.
build: Actor build to run. It can be either a build tag or build number. By default, the run uses
the build specified in the task settings (typically latest).
memory_mbytes: Memory limit for the run, in megabytes. By default, the run uses a memory limit specified
in the task settings.
max_items: Maximum number of results that will be returned by runs of this task. If the Actor of this task
is charged per result, you will not be charged for more results than the given limit.
run_timeout: Optional timeout for the run. By default, the run uses timeout specified
in the task settings.
restart_on_error: If true, the Task run process will be restarted whenever it exits with
a non-zero status code.
task_input: Task input object.
title: A human-friendly equivalent of the name.
actor_standby_desired_requests_per_actor_run: The desired number of concurrent HTTP requests for
a single Actor Standby run.
actor_standby_max_requests_per_actor_run: The maximum number of concurrent HTTP requests for
a single Actor Standby run.
actor_standby_idle_timeout: If the Actor run does not receive any requests for this time,
it will be shut down.
actor_standby_build: The build tag or number to run when the Actor is in Standby mode.
actor_standby_memory_mbytes: The memory in megabytes to use when the Actor is in Standby mode.
timeout: Timeout for the API HTTP request.
Returns:
The created task.
"""
if task_input is not None and not isinstance(task_input, TaskInput):
task_input = TaskInput.model_validate(task_input)
task_fields = CreateTaskRequest(
act_id=actor_id,
name=name,
title=title,
input=task_input,
options=TaskOptions(
build=build,
max_items=max_items,
memory_mbytes=memory_mbytes,
timeout_secs=to_seconds(run_timeout, as_int=True),
restart_on_error=restart_on_error,
),
actor_standby=ActorStandby(
desired_requests_per_actor_run=actor_standby_desired_requests_per_actor_run,
max_requests_per_actor_run=actor_standby_max_requests_per_actor_run,
idle_timeout_secs=to_seconds(actor_standby_idle_timeout, as_int=True),
build=actor_standby_build,
memory_mbytes=actor_standby_memory_mbytes,
),
)
result = await self._create(timeout=timeout, **task_fields.model_dump(by_alias=True, exclude_none=True))
return TaskResponse.model_validate(result).data