Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions livekit-rtc/livekit/rtc/_ffi_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import platform
import atexit
import threading
from typing import Generic, List, Optional, TypeVar
from typing import Callable, Generic, List, Optional, TypeVar

from ._proto import ffi_pb2 as proto_ffi
from ._utils import Queue, classproperty
Expand Down Expand Up @@ -114,29 +114,55 @@ def __repr__(self) -> str:
class FfiQueue(Generic[T]):
def __init__(self) -> None:
self._lock = threading.RLock()
self._subscribers: List[tuple[Queue[T], asyncio.AbstractEventLoop]] = []
# Format: (queue, loop, filter_fn or None)
self._subscribers: List[
tuple[Queue[T], asyncio.AbstractEventLoop, Optional[Callable[[T], bool]]]
] = []

def put(self, item: T) -> None:
with self._lock:
for queue, loop in self._subscribers:
for queue, loop, filter_fn in self._subscribers:
# If filter provided, skip items that don't match
if filter_fn is not None:
try:
if not filter_fn(item):
continue
except Exception:
pass # On filter error, deliver the item

try:
loop.call_soon_threadsafe(queue.put_nowait, item)
except Exception as e:
# this could happen if user closes the runloop without unsubscribing first
# it's not good when it does occur, but we should not fail the entire runloop
logger.error("error putting to queue: %s", e)

def subscribe(self, loop: Optional[asyncio.AbstractEventLoop] = None) -> Queue[T]:
def subscribe(
self,
loop: Optional[asyncio.AbstractEventLoop] = None,
filter_fn: Optional[Callable[[T], bool]] = None,
) -> Queue[T]:
"""Subscribe to FFI events.

Args:
loop: Event loop to use (defaults to current).
filter_fn: Optional filter function. If provided, only items where
filter_fn(item) returns True will be delivered.
If None, receives all events (original behavior).

Returns:
Queue to receive events from.
"""
with self._lock:
queue = Queue[T]()
loop = loop or asyncio.get_event_loop()
self._subscribers.append((queue, loop))
self._subscribers.append((queue, loop, filter_fn))
return queue

def unsubscribe(self, queue: Queue[T]) -> None:
with self._lock:
# looping here is ok, since we don't expect a lot of subscribers
for i, (q, _) in enumerate(self._subscribers):
for i, (q, _, _) in enumerate(self._subscribers):
if q == queue:
self._subscribers.pop(i)
break
Expand Down
7 changes: 6 additions & 1 deletion livekit-rtc/livekit/rtc/audio_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,12 @@ def __init__(
self._num_channels = num_channels
self._frame_size_ms = frame_size_ms
self._loop = loop or asyncio.get_event_loop()
self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop)
# Only subscribe to audio_stream_event to avoid unnecessary memory allocations
# from other event types (room_event, track_event, etc.)
self._ffi_queue = FfiClient.instance.queue.subscribe(
self._loop,
filter_fn=lambda e: e.WhichOneof("message") == "audio_stream_event",
)
self._queue: RingQueue[AudioFrameEvent | None] = RingQueue(capacity)

self._audio_filter_module: str | None = None
Expand Down
7 changes: 6 additions & 1 deletion livekit-rtc/livekit/rtc/video_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ def __init__(
**kwargs,
) -> None:
self._loop = loop or asyncio.get_event_loop()
self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop)
# Only subscribe to video_stream_event to avoid unnecessary memory allocations
# from other event types (room_event, track_event, etc.)
self._ffi_queue = FfiClient.instance.queue.subscribe(
self._loop,
filter_fn=lambda e: e.WhichOneof("message") == "video_stream_event",
)
self._queue: RingQueue[VideoFrameEvent | None] = RingQueue(capacity)
self._track: Track | None = track
self._format = format
Expand Down
Loading
Loading