Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 23 additions & 16 deletions src/blueapi/client/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import time
from concurrent.futures import Future

from bluesky_stomp.messaging import MessageContext, StompClient
from bluesky_stomp.models import Broker
Expand Down Expand Up @@ -224,7 +223,9 @@ def run_task(
task_response = self.create_task(task)
task_id = task_response.task_id

complete: Future[WorkerEvent] = Future()
from queue import SimpleQueue

event_queue = SimpleQueue()

def inner_on_event(event: AnyEvent, ctx: MessageContext) -> None:
match event:
Expand All @@ -237,26 +238,32 @@ def inner_on_event(event: AnyEvent, ctx: MessageContext) -> None:
case _:
relates_to_task = False
if relates_to_task:
if on_event is not None:
on_event(event)
event_queue.put((event, ctx))

with self._events:
self._events.subscribe_to_all_events(inner_on_event)
self._rest.update_worker_task(WorkerTask(task_id=task_id))
while True:
event, ctx = event_queue.get()
if on_event:
try:
on_event(event)
except Exception as e:
import logging

logging.error(
f"Callback ({on_event}) failed for event: {event}",
exc_info=e,
)
if isinstance(event, WorkerEvent) and (
(event.is_complete()) and (ctx.correlation_id == task_id)
):
if event.task_status is not None and event.task_status.task_failed:
complete.set_exception(
BlueskyStreamingError(
"\n".join(event.errors)
if len(event.errors) > 0
else "Unknown error"
)
raise BlueskyStreamingError(
"\n".join(event.errors) if event.errors else "Unknown error"
)
else:
complete.set_result(event)

with self._events:
self._events.subscribe_to_all_events(inner_on_event)
self.start_task(WorkerTask(task_id=task_id))
return complete.result(timeout=timeout)
return event

@start_as_current_span(TRACER, "task")
def create_and_start_task(self, task: TaskRequest) -> TaskResponse:
Expand Down
Loading