-
Notifications
You must be signed in to change notification settings - Fork 113
Description
Summary
When using multiple AudioStream instances (e.g., for multi-participant transcription), memory grows continuously because FfiQueue broadcasts ALL FFI events to ALL subscribers. Each subscriber creates objects for every event before filtering, causing memory accumulation.
Environment
- livekit-rtc: latest
- Python: 3.13
- Use case: Real-time transcription agent processing multiple participants
Reproduction
- Create an agent that subscribes to audio from multiple participants
- For each participant, create
rtc.AudioStream(track) - Run for 30+ minutes with active audio
Each AudioStream.__init__ calls:
self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop)Observed Behavior
Memory grows ~12-15 MB/min per active AudioStream. In a 30-min session with 4 participants:
Start: RSS=312 MB, FFI events=0
End: RSS=1291 MB, FFI events=903,154
Tracemalloc shows accumulation at:
- _ffi_client.py:123 — loop.call_soon_threadsafe(queue.put_nowait, item)
- _ffi_client.py:151 — proto_ffi.FfiEvent()
- asyncio/base_events.py:853 — events.Handle()
Root Cause
In _ffi_client.py:
class FfiQueue(Generic[T]):
def put(self, item: T) -> None:
with self._lock:
for queue, loop in self._subscribers:
loop.call_soon_threadsafe(queue.put_nowait, item) # ALL events to ALL subscribersEach call_soon_threadsafe() allocates:
- asyncio.Handle object
- contextvars.copy_context()
- Queue item
With N subscribers, every FFI event creates N × objects. AudioStream filters with wait_for(predicate) but objects are already allocated.
Proposed Solution
Add event-type filtering to subscribe():
def subscribe(
self,
loop: Optional[asyncio.AbstractEventLoop] = None,
event_types: set[str] | None = None # e.g., {"audio_stream_event"}
) -> Queue[T]:
...
def put(self, item: T) -> None:
which = item.WhichOneof("message")
with self._lock:
for queue, loop, event_types in self._subscribers:
if event_types is None or which in event_types:
loop.call_soon_threadsafe(queue.put_nowait, item)Then AudioStream subscribes with:
self._ffi_queue = FfiClient.instance.queue.subscribe(
self._loop,
event_types={"audio_stream_event"}
)Impact
- Long-running agents hit OOM (with 6GB limit: ~3.5 hours)
- Event loop lag grows with memory (60s timer takes 80s by end of session)
- User-visible latency in real-time features