Skip to content

Commit 68bd0f3

Browse files
committed
Add response_watcher to wait_for_finish and use it to redirect status if desired
1 parent 84bc946 commit 68bd0f3

4 files changed

Lines changed: 51 additions & 19 deletions

File tree

src/apify_client/clients/base/actor_job_base_client.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import math
55
import time
66
from datetime import datetime, timezone
7+
from typing import Any, Protocol
78

89
from apify_shared.consts import ActorJobStatus
910

@@ -20,7 +21,9 @@
2021
class ActorJobBaseClient(ResourceClient):
2122
"""Base sub-client class for Actor runs and Actor builds."""
2223

23-
def _wait_for_finish(self, wait_secs: int | None = None) -> dict | None:
24+
def _wait_for_finish(
25+
self, wait_secs: int | None = None, response_watcher: ResponseWatcher | None = None
26+
) -> dict | None:
2427
started_at = datetime.now(timezone.utc)
2528
should_repeat = True
2629
job: dict | None = None
@@ -39,6 +42,9 @@ def _wait_for_finish(self, wait_secs: int | None = None) -> dict | None:
3942
)
4043
job = parse_date_fields(pluck_data(response.json()))
4144

45+
if response_watcher:
46+
response_watcher(job)
47+
4248
seconds_elapsed = math.floor((datetime.now(timezone.utc) - started_at).total_seconds())
4349
if ActorJobStatus(job['status']).is_terminal or (
4450
wait_secs is not None and seconds_elapsed >= wait_secs
@@ -74,7 +80,9 @@ def _abort(self, *, gracefully: bool | None = None) -> dict:
7480
class ActorJobBaseClientAsync(ResourceClientAsync):
7581
"""Base async sub-client class for Actor runs and Actor builds."""
7682

77-
async def _wait_for_finish(self, wait_secs: int | None = None) -> dict | None:
83+
async def _wait_for_finish(
84+
self, wait_secs: int | None = None, response_watcher: ResponseWatcher | None = None
85+
) -> dict | None:
7886
started_at = datetime.now(timezone.utc)
7987
should_repeat = True
8088
job: dict | None = None
@@ -93,6 +101,9 @@ async def _wait_for_finish(self, wait_secs: int | None = None) -> dict | None:
93101
)
94102
job = parse_date_fields(pluck_data(response.json()))
95103

104+
if response_watcher:
105+
response_watcher(job)
106+
96107
seconds_elapsed = math.floor((datetime.now(timezone.utc) - started_at).total_seconds())
97108
if ActorJobStatus(job['status']).is_terminal or (
98109
wait_secs is not None and seconds_elapsed >= wait_secs
@@ -123,3 +134,10 @@ async def _abort(self, *, gracefully: bool | None = None) -> dict:
123134
params=self._params(gracefully=gracefully),
124135
)
125136
return parse_date_fields(pluck_data(response.json()))
137+
138+
139+
class ResponseWatcher(Protocol):
140+
"""Protocol for a callable watching parsed responses from wait_for_finish methods."""
141+
142+
def __call__(self, response: dict[str, Any] | None) -> Any:
143+
"""Watch the parsed response and act if needed."""

src/apify_client/clients/resource_clients/actor.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -358,8 +358,12 @@ def call(
358358
if logger == 'default':
359359
logger = None
360360

361-
with run_client.get_status_message_watcher(to_logger=logger), run_client.get_streamed_log(to_logger=logger):
362-
return self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)
361+
status_watcher = run_client.get_status_message_watcher(to_logger=logger)
362+
363+
with run_client.get_streamed_log(to_logger=logger):
364+
return self.root_client.run(started_run['id']).wait_for_finish(
365+
wait_secs=wait_secs, response_watcher=status_watcher.log_run_data
366+
)
363367

364368
def build(
365369
self,
@@ -779,11 +783,13 @@ async def call(
779783
if logger == 'default':
780784
logger = None
781785

782-
status_redirector = await run_client.get_status_message_watcher(to_logger=logger)
786+
status_watcher = await run_client.get_status_message_watcher(to_logger=logger)
783787
streamed_log = await run_client.get_streamed_log(to_logger=logger)
784788

785-
async with status_redirector, streamed_log:
786-
return await self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)
789+
async with streamed_log:
790+
return await self.root_client.run(started_run['id']).wait_for_finish(
791+
wait_secs=wait_secs, response_watcher=status_watcher.log_run_data
792+
)
787793

788794
async def build(
789795
self,

src/apify_client/clients/resource_clients/log.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -404,25 +404,25 @@ def __init__(self, *, to_logger: logging.Logger, check_period: timedelta = timed
404404
self._check_period = check_period.total_seconds()
405405
self._last_status_message = ''
406406

407-
def _log_run_data(self, run_data: dict[str, Any] | None) -> bool:
407+
def log_run_data(self, response: dict[str, Any] | None) -> bool:
408408
"""Get relevant run data, log them if changed and return `True` if more data is expected.
409409
410410
Args:
411-
run_data: The dictionary that contains the run data.
411+
response: The dictionary that contains the parsed run data.
412412
413413
Returns:
414414
`True` if more data is expected, `False` otherwise.
415415
"""
416-
if run_data is not None:
417-
status = run_data.get('status', 'Unknown status')
418-
status_message = run_data.get('statusMessage', '')
416+
if response is not None:
417+
status = response.get('status', 'Unknown status')
418+
status_message = response.get('statusMessage', '')
419419
new_status_message = f'Status: {status}, Message: {status_message}'
420420

421421
if new_status_message != self._last_status_message:
422422
self._last_status_message = new_status_message
423423
self._to_logger.info(new_status_message)
424424

425-
return not (run_data.get('isStatusMessageTerminal', False))
425+
return not (response.get('isStatusMessageTerminal', False))
426426
return True
427427

428428

@@ -476,7 +476,7 @@ async def __aexit__(
476476
async def _log_changed_status_message(self) -> None:
477477
while True:
478478
run_data = await self._run_client.get()
479-
if not self._log_run_data(run_data):
479+
if not self.log_run_data(run_data):
480480
break
481481
await asyncio.sleep(self._check_period)
482482

@@ -531,7 +531,7 @@ def __exit__(
531531

532532
def _log_changed_status_message(self) -> None:
533533
while True:
534-
if not self._log_run_data(self._run_client.get()):
534+
if not self.log_run_data(self._run_client.get()):
535535
break
536536
if self._stop_logging:
537537
break

src/apify_client/clients/resource_clients/run.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535

3636
from apify_shared.consts import RunGeneralAccess
3737

38+
from apify_client.clients.base.actor_job_base_client import ResponseWatcher
39+
3840

3941
class RunClient(ActorJobBaseClient):
4042
"""Sub-client for manipulating a single Actor run."""
@@ -102,17 +104,20 @@ def abort(self, *, gracefully: bool | None = None) -> dict:
102104
"""
103105
return self._abort(gracefully=gracefully)
104106

105-
def wait_for_finish(self, *, wait_secs: int | None = None) -> dict | None:
107+
def wait_for_finish(
108+
self, *, wait_secs: int | None = None, response_watcher: ResponseWatcher | None = None
109+
) -> dict | None:
106110
"""Wait synchronously until the run finishes or the server times out.
107111
108112
Args:
109113
wait_secs: How long does the client wait for run to finish. None for indefinite.
114+
response_watcher: A callback function that will be called with parsed Actor run data.
110115
111116
Returns:
112117
The Actor run data. If the status on the object is not one of the terminal statuses (SUCEEDED, FAILED,
113118
TIMED_OUT, ABORTED), then the run has not yet finished.
114119
"""
115-
return self._wait_for_finish(wait_secs=wait_secs)
120+
return self._wait_for_finish(wait_secs=wait_secs, response_watcher=response_watcher)
116121

117122
def metamorph(
118123
self,
@@ -417,17 +422,20 @@ async def abort(self, *, gracefully: bool | None = None) -> dict:
417422
"""
418423
return await self._abort(gracefully=gracefully)
419424

420-
async def wait_for_finish(self, *, wait_secs: int | None = None) -> dict | None:
425+
async def wait_for_finish(
426+
self, *, wait_secs: int | None = None, response_watcher: ResponseWatcher | None = None
427+
) -> dict | None:
421428
"""Wait synchronously until the run finishes or the server times out.
422429
423430
Args:
424431
wait_secs: How long does the client wait for run to finish. None for indefinite.
432+
response_watcher: A callback function that will be called with parsed Actor run data.
425433
426434
Returns:
427435
The Actor run data. If the status on the object is not one of the terminal statuses (SUCEEDED, FAILED,
428436
TIMED_OUT, ABORTED), then the run has not yet finished.
429437
"""
430-
return await self._wait_for_finish(wait_secs=wait_secs)
438+
return await self._wait_for_finish(wait_secs=wait_secs, response_watcher=response_watcher)
431439

432440
async def delete(self) -> None:
433441
"""Delete the run.

0 commit comments

Comments
 (0)