diff --git a/src/blueapi/client/client.py b/src/blueapi/client/client.py index 0930e240a..7330ebc31 100644 --- a/src/blueapi/client/client.py +++ b/src/blueapi/client/client.py @@ -1,5 +1,4 @@ import time -from concurrent.futures import Future from bluesky_stomp.messaging import MessageContext, StompClient from bluesky_stomp.models import Broker @@ -224,7 +223,9 @@ def run_task( task_response = self.create_task(task) task_id = task_response.task_id - complete: Future[WorkerEvent] = Future() + from queue import SimpleQueue + + event_queue = SimpleQueue() def inner_on_event(event: AnyEvent, ctx: MessageContext) -> None: match event: @@ -237,26 +238,32 @@ def inner_on_event(event: AnyEvent, ctx: MessageContext) -> None: case _: relates_to_task = False if relates_to_task: - if on_event is not None: - on_event(event) + event_queue.put((event, ctx)) + + with self._events: + self._events.subscribe_to_all_events(inner_on_event) + self._rest.update_worker_task(WorkerTask(task_id=task_id)) + while True: + event, ctx = event_queue.get() + if on_event: + try: + on_event(event) + except Exception as e: + import logging + + logging.error( + f"Callback ({on_event}) failed for event: {event}", + exc_info=e, + ) if isinstance(event, WorkerEvent) and ( (event.is_complete()) and (ctx.correlation_id == task_id) ): if event.task_status is not None and event.task_status.task_failed: - complete.set_exception( - BlueskyStreamingError( - "\n".join(event.errors) - if len(event.errors) > 0 - else "Unknown error" - ) + raise BlueskyStreamingError( + "\n".join(event.errors) if event.errors else "Unknown error" ) else: - complete.set_result(event) - - with self._events: - self._events.subscribe_to_all_events(inner_on_event) - self.start_task(WorkerTask(task_id=task_id)) - return complete.result(timeout=timeout) + return event @start_as_current_span(TRACER, "task") def create_and_start_task(self, task: TaskRequest) -> TaskResponse: