Skip to content

Conversation

@Hormold
Copy link

@Hormold Hormold commented Feb 2, 2026

Summary

Fix for #563
This PR adds an optional event_types parameter to FfiQueue.subscribe() that allows subscribers to filter events by type before call_soon_threadsafe() is called, preventing unnecessary object allocation.

Problem

FfiQueue.put() broadcasts ALL FFI events to ALL subscribers via call_soon_threadsafe(). Each call creates asyncio.Handle + contextvars.copy_context() objects. AudioStream and VideoStream filter events with wait_for(predicate), but objects are already allocated by then.

With N streams subscribed, this creates N × all_events objects, with 95%+ discarded after allocation.

Real-world impact observed in a 30-min meeting with 4 participants:

  • 903,154 FFI events accumulated
  • Memory grew from 312 MB to 1.29 GB (~12-15 MB/min)
  • Event loop lag increased to 20+ seconds
  • Transcription freezing due to event loop starvation

Solution

  • Add optional event_types: Optional[Set[str]] parameter to FfiQueue.subscribe()
  • Filter events by WhichOneof("message") before calling call_soon_threadsafe()
  • AudioStream now subscribes with event_types={"audio_stream_event"}
  • VideoStream now subscribes with event_types={"video_stream_event"}
  • Full backwards compatibility: event_types=None receives all events (original behavior)

Results

With the patch applied:

  • 95% reduction in object allocations for stream subscribers
  • Memory stable at ~10 MB instead of growing to 1+ GB
  • No event loop lag or transcription freezing

Testing

  • Added unit tests for event filtering functionality
  • Verified filtering behavior with multiple subscribers and mixed event types
  • Tested in production environment with stable memory usage over 2+ hours

Files Changed

  • livekit-rtc/livekit/rtc/_ffi_client.py - Add event_types filtering to FfiQueue
  • livekit-rtc/livekit/rtc/audio_stream.py - Use filtered subscription
  • livekit-rtc/livekit/rtc/video_stream.py - Use filtered subscription
  • tests/rtc/test_ffi_queue.py - Unit tests for filtering

PROBLEM:
FfiQueue.put() broadcasts ALL FFI events to ALL subscribers via
call_soon_threadsafe(). Each call creates asyncio.Handle + context objects.
AudioStream/VideoStream filter events with wait_for(predicate), but objects
are already allocated. With N streams, this creates N × all_events objects,
with 95%+ discarded after allocation.

In a 2-hour meeting with 4 participants, we observed:
- 903,154 FFI events accumulated
- Memory grew from 312 MB to 1.29 GB
- Event loop lag increased to 20+ seconds

SOLUTION:
Add optional `event_types` parameter to FfiQueue.subscribe(). When specified,
events are filtered by type BEFORE calling call_soon_threadsafe(), preventing
unnecessary object allocation.

AudioStream now subscribes with event_types={"audio_stream_event"}
VideoStream now subscribes with event_types={"video_stream_event"}

This reduces memory allocations by ~95% for stream subscribers while
maintaining full backwards compatibility (event_types=None = all events).

TESTING:
- Added unit tests for event filtering functionality
- Verified 95% reduction in object creation with filtered subscribers
- Tested in production environment with stable memory usage

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@Hormold Hormold marked this pull request as draft February 2, 2026 23:36
…tion

Addresses review feedback: FfiQueue is Generic[T], so we can't assume
item has WhichOneof method. Instead, use a filter_fn callback that the
caller provides - this keeps FfiQueue generic while allowing filtering.

- FfiQueue.subscribe() now takes optional filter_fn: Callable[[T], bool]
- AudioStream/VideoStream provide the filter that knows the concrete type
- Tests updated to use filter_fn approach

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant