feat(rtc): Add event_types filtering to FfiQueue.subscribe() to reduce memory allocations #564
+365
−8
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Summary
Fix for #563
This PR adds an optional
event_typesparameter toFfiQueue.subscribe()that allows subscribers to filter events by type beforecall_soon_threadsafe()is called, preventing unnecessary object allocation.Problem
FfiQueue.put()broadcasts ALL FFI events to ALL subscribers viacall_soon_threadsafe(). Each call createsasyncio.Handle+contextvars.copy_context()objects.AudioStreamandVideoStreamfilter events withwait_for(predicate), but objects are already allocated by then.With N streams subscribed, this creates N × all_events objects, with 95%+ discarded after allocation.
Real-world impact observed in a 30-min meeting with 4 participants:
Solution
event_types: Optional[Set[str]]parameter toFfiQueue.subscribe()WhichOneof("message")before callingcall_soon_threadsafe()AudioStreamnow subscribes withevent_types={"audio_stream_event"}VideoStreamnow subscribes withevent_types={"video_stream_event"}event_types=Nonereceives all events (original behavior)Results
With the patch applied:
Testing
Files Changed
livekit-rtc/livekit/rtc/_ffi_client.py- Addevent_typesfiltering toFfiQueuelivekit-rtc/livekit/rtc/audio_stream.py- Use filtered subscriptionlivekit-rtc/livekit/rtc/video_stream.py- Use filtered subscriptiontests/rtc/test_ffi_queue.py- Unit tests for filtering