From 3295619cfbe7bb71c7c7546076c64b4239eee2c6 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 27 Jan 2026 16:47:12 +0000 Subject: [PATCH 1/5] WIP returning results from plans --- src/blueapi/core/context.py | 2 +- src/blueapi/worker/event.py | 2 ++ src/blueapi/worker/task.py | 5 ++++- src/blueapi/worker/task_worker.py | 7 +++++-- 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/blueapi/core/context.py b/src/blueapi/core/context.py index 0d662d13a7..40c8a10f43 100644 --- a/src/blueapi/core/context.py +++ b/src/blueapi/core/context.py @@ -120,7 +120,7 @@ class BlueskyContext: configuration: InitVar[ApplicationConfig | None] = None run_engine: RunEngine = field( - default_factory=lambda: RunEngine(context_managers=[]) + default_factory=lambda: RunEngine(context_managers=[], call_returns_result=True) ) tiled_conf: TiledConfig | None = field(default=None, init=False, repr=False) numtracker: NumtrackerClient | None = field(default=None, init=False, repr=False) diff --git a/src/blueapi/worker/event.py b/src/blueapi/worker/event.py index 8bd99d63a3..ccf711e089 100644 --- a/src/blueapi/worker/event.py +++ b/src/blueapi/worker/event.py @@ -1,5 +1,6 @@ from collections.abc import Mapping from enum import Enum +from typing import Any from bluesky.run_engine import RunEngineStateMachine from pydantic import Field @@ -109,6 +110,7 @@ class TaskStatus(BlueapiBaseModel): task_id: str task_complete: bool task_failed: bool + result: Any = None class WorkerEvent(BlueapiBaseModel): diff --git a/src/blueapi/worker/task.py b/src/blueapi/worker/task.py index 32060bc647..669b7fb05f 100644 --- a/src/blueapi/worker/task.py +++ b/src/blueapi/worker/task.py @@ -38,7 +38,10 @@ def do_task(self, ctx: BlueskyContext) -> None: func = ctx.plan_functions[self.name] prepared_params = self.prepare_params(ctx) ctx.run_engine.md.update(self.metadata) - ctx.run_engine(func(**prepared_params)) + result = ctx.run_engine(func(**prepared_params)) + if isinstance(result, tuple): + return None + return result.plan_result def _lookup_params(ctx: BlueskyContext, task: Task) -> BaseModel: diff --git a/src/blueapi/worker/task_worker.py b/src/blueapi/worker/task_worker.py index 62435d2618..e25b2e7522 100644 --- a/src/blueapi/worker/task_worker.py +++ b/src/blueapi/worker/task_worker.py @@ -69,6 +69,7 @@ class TrackableTask(BlueapiBaseModel): is_complete: bool = False is_pending: bool = True errors: list[str] = Field(default_factory=list) + result: Any | None = None class TaskWorker: @@ -423,11 +424,12 @@ def _cycle(self) -> None: next_task: TrackableTask | KillSignal = self._task_channel.get() if isinstance(next_task, TrackableTask): - def process_task(): + def process_task() -> Any: LOGGER.info(f"Got new task: {next_task}") self._current = next_task self._current.is_pending = False - self._current.task.do_task(self._ctx) + result = self._current.task.do_task(self._ctx) + self._current.result = result with plan_tag_filter_context(next_task.task.name, LOGGER): if self._current_task_otel_context is not None: @@ -528,6 +530,7 @@ def _report_status( task_id=self._current.task_id, task_complete=self._current.is_complete, task_failed=bool(self._current.errors), + result=self._current.result, ) correlation_id = self._current.task_id add_span_attributes( From 7f1d941bd62116297fd3e252cb2aff78a694e156 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Wed, 28 Jan 2026 17:25:27 +0000 Subject: [PATCH 2/5] Store plan result as python dict/list/str etc Use pydantic to convert result to something that can be JSON serialized later. --- src/blueapi/worker/event.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/blueapi/worker/event.py b/src/blueapi/worker/event.py index ccf711e089..1703e2caff 100644 --- a/src/blueapi/worker/event.py +++ b/src/blueapi/worker/event.py @@ -1,9 +1,10 @@ +import logging from collections.abc import Mapping from enum import Enum from typing import Any from bluesky.run_engine import RunEngineStateMachine -from pydantic import Field +from pydantic import Field, TypeAdapter, field_validator from super_state_machine.extras import PropertyMachine, ProxyString from blueapi.utils import BlueapiBaseModel @@ -12,6 +13,8 @@ # RawRunEngineState = type[PropertyMachine | ProxyString | str] RawRunEngineState = PropertyMachine | ProxyString | str +log = logging.getLogger(__name__) + # NOTE this is interim until refactor class TaskStatusEnum(str, Enum): @@ -112,6 +115,17 @@ class TaskStatus(BlueapiBaseModel): task_failed: bool result: Any = None + @field_validator("result") + @classmethod + def _serialize_result(cls, result): + try: + return TypeAdapter(type(result)).dump_python(result) + except Exception: + log.warning( + "Plan result type (%s) not serializable: %s", type(result), result + ) + pass + class WorkerEvent(BlueapiBaseModel): """ From 80a0af9db3a8e1f044ddc987bd2aaa30e1d97cb5 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Thu, 29 Jan 2026 16:14:16 +0000 Subject: [PATCH 3/5] Remove return type from process task --- src/blueapi/worker/task_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/blueapi/worker/task_worker.py b/src/blueapi/worker/task_worker.py index e25b2e7522..a2bcb9a144 100644 --- a/src/blueapi/worker/task_worker.py +++ b/src/blueapi/worker/task_worker.py @@ -424,7 +424,7 @@ def _cycle(self) -> None: next_task: TrackableTask | KillSignal = self._task_channel.get() if isinstance(next_task, TrackableTask): - def process_task() -> Any: + def process_task(): LOGGER.info(f"Got new task: {next_task}") self._current = next_task self._current.is_pending = False From 21afa3987686108dac0b8cf124311ecab515c2a9 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Thu, 29 Jan 2026 17:09:07 +0000 Subject: [PATCH 4/5] Handle serialization of TrackableTask as well as TaskStatus --- src/blueapi/utils/base_model.py | 22 +++++++++++++++++++++- src/blueapi/worker/event.py | 17 +++-------------- src/blueapi/worker/task_worker.py | 4 ++-- 3 files changed, 26 insertions(+), 17 deletions(-) diff --git a/src/blueapi/utils/base_model.py b/src/blueapi/utils/base_model.py index 142052a8c3..daad9a0bd4 100644 --- a/src/blueapi/utils/base_model.py +++ b/src/blueapi/utils/base_model.py @@ -1,5 +1,14 @@ -from pydantic import BaseModel, ConfigDict +import logging +from typing import Annotated, Any +from pydantic import ( + BaseModel, + ConfigDict, + PlainSerializer, + TypeAdapter, +) + +logger = logging.getLogger(__name__) # Pydantic config for blueapi API models with common config. BlueapiModelConfig = ConfigDict( extra="forbid", @@ -16,6 +25,17 @@ ) +def _safe_serialize(value: Any) -> Any: + """Try serializing but skip any type that pydantic can't handle""" + try: + return TypeAdapter(type(value)).dump_python(value, mode="json") + except Exception: + logger.warning("Type '%s' not serializable: %s", type(value), value) + + +NoneFallback = Annotated[Any, PlainSerializer(_safe_serialize)] + + class BlueapiBaseModel(BaseModel): """ Base class for blueapi API models. diff --git a/src/blueapi/worker/event.py b/src/blueapi/worker/event.py index 1703e2caff..a0d8e047bc 100644 --- a/src/blueapi/worker/event.py +++ b/src/blueapi/worker/event.py @@ -1,13 +1,13 @@ import logging from collections.abc import Mapping from enum import Enum -from typing import Any from bluesky.run_engine import RunEngineStateMachine -from pydantic import Field, TypeAdapter, field_validator +from pydantic import Field from super_state_machine.extras import PropertyMachine, ProxyString from blueapi.utils import BlueapiBaseModel +from blueapi.utils.base_model import NoneFallback # The RunEngine can return any of these three types as its state # RawRunEngineState = type[PropertyMachine | ProxyString | str] @@ -113,18 +113,7 @@ class TaskStatus(BlueapiBaseModel): task_id: str task_complete: bool task_failed: bool - result: Any = None - - @field_validator("result") - @classmethod - def _serialize_result(cls, result): - try: - return TypeAdapter(type(result)).dump_python(result) - except Exception: - log.warning( - "Plan result type (%s) not serializable: %s", type(result), result - ) - pass + result: NoneFallback = None class WorkerEvent(BlueapiBaseModel): diff --git a/src/blueapi/worker/task_worker.py b/src/blueapi/worker/task_worker.py index a2bcb9a144..808c2f9e8c 100644 --- a/src/blueapi/worker/task_worker.py +++ b/src/blueapi/worker/task_worker.py @@ -32,7 +32,7 @@ ) from blueapi.core.bluesky_event_loop import configure_bluesky_event_loop from blueapi.log import plan_tag_filter_context -from blueapi.utils.base_model import BlueapiBaseModel +from blueapi.utils.base_model import BlueapiBaseModel, NoneFallback from blueapi.utils.thread_exception import handle_all_exceptions from .event import ( @@ -69,7 +69,7 @@ class TrackableTask(BlueapiBaseModel): is_complete: bool = False is_pending: bool = True errors: list[str] = Field(default_factory=list) - result: Any | None = None + result: NoneFallback = None class TaskWorker: From 7670dbe9781974e2887de3c32a6523fc76e33e1a Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Thu, 29 Jan 2026 17:23:37 +0000 Subject: [PATCH 5/5] Update tests --- docs/reference/openapi.yaml | 2 ++ tests/unit_tests/service/test_rest_api.py | 5 +++++ tests/unit_tests/test_cli.py | 9 ++++++--- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/docs/reference/openapi.yaml b/docs/reference/openapi.yaml index 499e8cc52d..1380e642b7 100644 --- a/docs/reference/openapi.yaml +++ b/docs/reference/openapi.yaml @@ -314,6 +314,8 @@ components: request_id: title: Request Id type: string + result: + title: Result task: $ref: '#/components/schemas/Task' task_id: diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index b0bc09dd33..12555cc784 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -336,6 +336,7 @@ def test_get_tasks(mock_runner: Mock, client: TestClient) -> None: "params": {"time": 0.0}, "metadata": {}, }, + "result": None, "task_id": "0", }, { @@ -348,6 +349,7 @@ def test_get_tasks(mock_runner: Mock, client: TestClient) -> None: "params": {}, "metadata": {}, }, + "result": None, "task_id": "1", }, ] @@ -379,6 +381,7 @@ def test_get_tasks_by_status(mock_runner: Mock, client: TestClient) -> None: "params": {}, "metadata": {}, }, + "result": None, "task_id": "3", } ] @@ -472,6 +475,7 @@ def test_get_task(mock_runner: Mock, client: TestClient): "foo": "bar", }, }, + "result": None, "task_id": f"{task_id}", } @@ -500,6 +504,7 @@ def test_get_all_tasks(mock_runner: Mock, client: TestClient): "is_complete": False, "is_pending": True, "request_id": None, + "result": None, "errors": [], } ] diff --git a/tests/unit_tests/test_cli.py b/tests/unit_tests/test_cli.py index 2a49a1fe80..1aaf585581 100644 --- a/tests/unit_tests/test_cli.py +++ b/tests/unit_tests/test_cli.py @@ -929,9 +929,12 @@ def test_event_formatting(): OutputFormat.JSON, worker, ( - """{"state": "RUNNING", "task_status": """ - """{"task_id": "count", "task_complete": false, "task_failed": false}, """ - """"errors": [], "warnings": []}\n""" + '{"state": "RUNNING", "task_status": {' + '"task_id": "count", ' + '"task_complete": false, ' + '"task_failed": false, ' + '"result": null' + '}, "errors": [], "warnings": []}\n' ), ) _assert_matching_formatting(OutputFormat.COMPACT, worker, "Worker Event: RUNNING\n")