Skip to content

Commit 8c0ada5

Browse files
feat: Add plan name to logs (#1178)
Fixes #883 --------- Co-authored-by: Joseph Ware <[email protected]>
1 parent be2f7b3 commit 8c0ada5

File tree

10 files changed

+169
-42
lines changed

10 files changed

+169
-42
lines changed

docs/reference/openapi.yaml

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,28 @@ components:
224224
- new_state
225225
title: StateChangeRequest
226226
type: object
227+
Task:
228+
additionalProperties: false
229+
description: Task that will run a plan
230+
properties:
231+
metadata:
232+
additionalProperties: true
233+
description: Any metadata to apply to all runs within this task
234+
title: Metadata
235+
type: object
236+
name:
237+
description: Name of plan to run
238+
title: Name
239+
type: string
240+
params:
241+
additionalProperties: true
242+
description: Values for parameters to plan, if any
243+
title: Params
244+
type: object
245+
required:
246+
- name
247+
title: Task
248+
type: object
227249
TaskRequest:
228250
additionalProperties: false
229251
description: Request to run a task with related info
@@ -293,7 +315,7 @@ components:
293315
title: Request Id
294316
type: string
295317
task:
296-
title: Task
318+
$ref: '#/components/schemas/Task'
297319
task_id:
298320
title: Task Id
299321
type: string
@@ -355,7 +377,7 @@ info:
355377
name: Apache 2.0
356378
url: https://www.apache.org/licenses/LICENSE-2.0.html
357379
title: BlueAPI Control
358-
version: 1.1.0
380+
version: 1.1.1
359381
openapi: 3.1.0
360382
paths:
361383
/config/oidc:

src/blueapi/client/client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
TasksListResponse,
2626
WorkerTask,
2727
)
28-
from blueapi.worker import Task, TrackableTask, WorkerEvent, WorkerState
28+
from blueapi.worker import TrackableTask, WorkerEvent, WorkerState
2929
from blueapi.worker.event import ProgressEvent, TaskStatus
3030

3131
from .event_bus import AnyEvent, BlueskyStreamingError, EventBusClient, OnAnyEvent
@@ -159,15 +159,15 @@ def resume(self) -> WorkerState:
159159
return self._rest.set_state(WorkerState.RUNNING, defer=False)
160160

161161
@start_as_current_span(TRACER, "task_id")
162-
def get_task(self, task_id: str) -> TrackableTask[Task]:
162+
def get_task(self, task_id: str) -> TrackableTask:
163163
"""
164164
Get a task stored by the worker
165165
166166
Args:
167167
task_id: Unique ID for the task
168168
169169
Returns:
170-
TrackableTask[Task]: Task details
170+
TrackableTask: Task details
171171
"""
172172
assert task_id, "Task ID not provided!"
173173
return self._rest.get_task(task_id)

src/blueapi/client/rest.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
TasksListResponse,
2727
WorkerTask,
2828
)
29-
from blueapi.worker import Task, TrackableTask, WorkerState
29+
from blueapi.worker import TrackableTask, WorkerState
3030

3131
T = TypeVar("T")
3232

@@ -172,8 +172,8 @@ def set_state(
172172
data={"new_state": state, "defer": defer},
173173
)
174174

175-
def get_task(self, task_id: str) -> TrackableTask[Task]:
176-
return self._request_and_deserialize(f"/tasks/{task_id}", TrackableTask[Task])
175+
def get_task(self, task_id: str) -> TrackableTask:
176+
return self._request_and_deserialize(f"/tasks/{task_id}", TrackableTask)
177177

178178
def get_all_tasks(self) -> TasksListResponse:
179179
return self._request_and_deserialize("/tasks", TasksListResponse)

src/blueapi/log.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
import os
3+
from contextlib import contextmanager
34

45
from graypy import GELFTCPHandler
56

@@ -21,6 +22,39 @@ def filter(self, record: logging.LogRecord) -> bool:
2122
return True
2223

2324

25+
class PlanTagFilter(logging.Filter):
26+
"""Filter to attach name of plan as an attribute to LogRecords.
27+
28+
Attaches the attribute `plan_name` to all LogRecords that are passed through.
29+
"""
30+
31+
def __init__(self, plan_name: str):
32+
self.plan_name = plan_name
33+
34+
def filter(self, record: logging.LogRecord) -> bool:
35+
record.plan_name = self.plan_name
36+
return True
37+
38+
39+
@contextmanager
40+
def plan_tag_filter_context(plan_name: str, logger: logging.Logger):
41+
"""Context manager that attaches and removes `PlanTagFilter` to a given logger.
42+
43+
Creates an instance of PlanTagFilter and attaches it to the given logger for the
44+
duration of the context. On exit the filter is removed.
45+
46+
Args:
47+
plan_name: str name of plan being executed
48+
logger: logging.Logger to attach filter to
49+
"""
50+
filter = PlanTagFilter(plan_name)
51+
try:
52+
logger.addFilter(filter)
53+
yield
54+
finally:
55+
logger.removeFilter(filter)
56+
57+
2458
def set_up_logging(logging_config: LoggingConfig) -> None:
2559
"""Configure root level logger for blueapi.
2660

src/blueapi/service/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
from .runner import WorkerDispatcher
5959

6060
#: API version to publish in OpenAPI schema
61-
REST_API_VERSION = "1.1.0"
61+
REST_API_VERSION = "1.1.1"
6262

6363
LICENSE_INFO: dict[str, str] = {
6464
"name": "Apache 2.0",

src/blueapi/worker/task_worker.py

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from functools import partial
77
from queue import Full, Queue
88
from threading import Event, RLock
9-
from typing import Any, Generic, TypeVar
9+
from typing import Any, TypeVar
1010

1111
from bluesky.protocols import Status
1212
from observability_utils.tracing import (
@@ -31,6 +31,7 @@
3131
WatchableStatus,
3232
)
3333
from blueapi.core.bluesky_event_loop import configure_bluesky_event_loop
34+
from blueapi.log import plan_tag_filter_context
3435
from blueapi.utils.base_model import BlueapiBaseModel
3536
from blueapi.utils.thread_exception import handle_all_exceptions
3637

@@ -57,13 +58,13 @@
5758
T = TypeVar("T")
5859

5960

60-
class TrackableTask(BlueapiBaseModel, Generic[T]):
61+
class TrackableTask(BlueapiBaseModel):
6162
"""
6263
A representation of a task that the worker recognizes
6364
"""
6465

6566
task_id: str
66-
task: T
67+
task: Task
6768
request_id: str | SkipJsonSchema[None] = None
6869
is_complete: bool = False
6970
is_pending: bool = True
@@ -229,7 +230,7 @@ def get_tasks_by_status(self, status: TaskStatusEnum) -> list[TrackableTask]:
229230
return []
230231

231232
@start_as_current_span(TRACER)
232-
def get_active_task(self) -> TrackableTask[Task] | None:
233+
def get_active_task(self) -> TrackableTask | None:
233234
"""
234235
Returns the task the worker is currently running
235236
Returns:
@@ -254,7 +255,8 @@ def begin_task(self, task_id: str) -> None:
254255
if task is None:
255256
raise KeyError(f"No pending task with ID {task_id}")
256257
else:
257-
self._submit_trackable_task(task)
258+
with plan_tag_filter_context(task.task.name, LOGGER):
259+
self._submit_trackable_task(task)
258260

259261
@start_as_current_span(TRACER, "task.name", "task.params")
260262
def submit_task(self, task: Task) -> str:
@@ -419,19 +421,29 @@ def _cycle(self) -> None:
419421
LOGGER.info("Awaiting task")
420422
next_task: TrackableTask | KillSignal = self._task_channel.get()
421423
if isinstance(next_task, TrackableTask):
422-
if self._current_task_otel_context is not None:
423-
with TRACER.start_as_current_span(
424-
"_cycle",
425-
context=self._current_task_otel_context,
426-
kind=SpanKind.SERVER,
427-
):
428-
LOGGER.info(f"Got new task: {next_task}")
429-
self._current = next_task
430-
self._current_task_otel_context = get_current()
431-
add_span_attributes({"next_task.task_id": next_task.task_id})
432-
433-
self._current.is_pending = False
434-
self._current.task.do_task(self._ctx)
424+
425+
def process_task():
426+
LOGGER.info(f"Got new task: {next_task}")
427+
self._current = next_task
428+
self._current.is_pending = False
429+
self._current.task.do_task(self._ctx)
430+
431+
with plan_tag_filter_context(next_task.task.name, LOGGER):
432+
if self._current_task_otel_context is not None:
433+
with TRACER.start_as_current_span(
434+
"_cycle",
435+
context=self._current_task_otel_context,
436+
kind=SpanKind.SERVER,
437+
):
438+
self._current_task_otel_context = get_current()
439+
add_span_attributes(
440+
{"next_task.task_id": next_task.task_id}
441+
)
442+
443+
process_task()
444+
else:
445+
process_task()
446+
435447
elif isinstance(next_task, KillSignal):
436448
# If we receive a kill signal we begin to shut the worker down.
437449
# Note that the kill signal is explicitly not a type of task as we don't

tests/unit_tests/service/test_interface.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -234,9 +234,9 @@ def test_begin_task_no_task_id(worker_mock: MagicMock):
234234

235235
@patch("blueapi.service.interface.TaskWorker.get_tasks_by_status")
236236
def test_get_tasks_by_status(get_tasks_by_status_mock: MagicMock):
237-
pending_task1 = TrackableTask(task_id="0", task=None)
238-
pending_task2 = TrackableTask(task_id="1", task=None)
239-
running_task = TrackableTask(task_id="2", task=None)
237+
pending_task1 = TrackableTask(task_id="0", task=Task(name="pending_task1"))
238+
pending_task2 = TrackableTask(task_id="1", task=Task(name="pending_task2"))
239+
running_task = TrackableTask(task_id="2", task=Task(name="running_task"))
240240

241241
def mock_tasks_by_status(status: TaskStatusEnum) -> list[TrackableTask]:
242242
if status == TaskStatusEnum.PENDING:
@@ -307,9 +307,9 @@ def test_cancel_active_task(cancel_active_task_mock: MagicMock):
307307
@patch("blueapi.service.interface.TaskWorker.get_tasks")
308308
def test_get_tasks(get_tasks_mock: MagicMock):
309309
tasks = [
310-
TrackableTask(task_id="0", task=None),
311-
TrackableTask(task_id="1", task=None),
312-
TrackableTask(task_id="2", task=None),
310+
TrackableTask(task_id="0", task=Task(name="0")),
311+
TrackableTask(task_id="1", task=Task(name="1")),
312+
TrackableTask(task_id="2", task=Task(name="2")),
313313
]
314314
get_tasks_mock.return_value = tasks
315315

tests/unit_tests/service/test_rest_api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ def test_put_plan_fails_if_not_idle(mock_runner: Mock, client: TestClient) -> No
300300

301301
# Set to non idle
302302
mock_runner.run.return_value = TrackableTask(
303-
task=None, task_id=task_id_current, is_complete=False
303+
task=Task(name="none"), task_id=task_id_current, is_complete=False
304304
)
305305

306306
resp = client.put("/worker/task", json={"task_id": task_id_new})

tests/unit_tests/test_log.py

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import logging
2-
from unittest.mock import patch
2+
from collections.abc import Generator
3+
from unittest.mock import Mock, patch
34

45
import pytest
56
from bluesky.log import logger as bluesky_logger
67
from dodal.log import LOGGER as dodal_logger
78

89
from blueapi.config import GraylogConfig, LoggingConfig
9-
from blueapi.log import set_up_logging
10+
from blueapi.log import PlanTagFilter, plan_tag_filter_context, set_up_logging
1011

1112

1213
def clear_all_loggers_and_handlers(logger):
@@ -23,7 +24,7 @@ def clear_all_loggers_and_handlers(logger):
2324

2425

2526
@pytest.fixture(params=LOGGER_NAMES)
26-
def logger_with_graylog(request):
27+
def logger_with_graylog(request) -> Generator[logging.Logger]:
2728
logger = logging.getLogger(request.param)
2829
graylog_config = GraylogConfig(enabled=True)
2930
set_up_logging(LoggingConfig(graylog=graylog_config))
@@ -32,7 +33,7 @@ def logger_with_graylog(request):
3233

3334

3435
@pytest.fixture(params=LOGGER_NAMES)
35-
def logger_without_graylog(request):
36+
def logger_without_graylog(request) -> Generator[logging.Logger]:
3637
logger = logging.getLogger(request.param)
3738
graylog_config = GraylogConfig(enabled=False)
3839
set_up_logging(LoggingConfig(graylog=graylog_config))
@@ -41,19 +42,19 @@ def logger_without_graylog(request):
4142

4243

4344
@pytest.fixture
44-
def logger(logger_with_graylog):
45+
def logger(logger_with_graylog) -> logging.Logger:
4546
return logger_with_graylog
4647

4748

4849
@pytest.fixture
49-
def mock_stream_handler_emit():
50+
def mock_stream_handler_emit() -> Generator[Mock]:
5051
with patch("blueapi.log.logging.StreamHandler.emit") as stream_handler_emit:
5152
# stream_handler_emit.reset_mock()
5253
yield stream_handler_emit
5354

5455

5556
@pytest.fixture
56-
def mock_graylog_emit():
57+
def mock_graylog_emit() -> Generator[Mock]:
5758
with patch("blueapi.log.GELFTCPHandler.emit") as graylog_emit:
5859
yield graylog_emit
5960

@@ -65,7 +66,7 @@ def mock_graylog_emit():
6566

6667

6768
@pytest.fixture(params=MOCK_HANDLER_EMIT_STRINGS)
68-
def mock_handler_emit(request):
69+
def mock_handler_emit(request) -> Generator[Mock]:
6970
with patch(request.param) as mock_emit:
7071
yield mock_emit
7172

@@ -101,3 +102,25 @@ def test_library_logger_intergrations(logger, library_logger, mock_handler_emit)
101102
mock_handler_emit.assert_not_called()
102103
library_logger.info("FOO")
103104
mock_handler_emit.assert_called()
105+
106+
107+
def test_plan_tag_filter_context_adds_filter(logger):
108+
assert logger.filters == []
109+
with plan_tag_filter_context("foo", logger):
110+
filters = logger.filters
111+
assert len(filters) == 1
112+
assert isinstance(filters[0], PlanTagFilter)
113+
114+
115+
def test_plan_tag_filter_context_removes_filter(logger):
116+
with plan_tag_filter_context("foo", logger):
117+
filters = logger.filters
118+
assert len(filters) == 1
119+
assert isinstance(filters[0], PlanTagFilter)
120+
assert logger.filters == []
121+
122+
123+
def test_filter_tags_plan_name(logger, mock_handler_emit):
124+
logger.addFilter(PlanTagFilter("foo"))
125+
logger.info("FOO")
126+
assert mock_handler_emit.call_args[0][0].plan_name == "foo"

0 commit comments

Comments
 (0)