Skip to content

Feature: Synchronous Low‑Level API for ezmsg (ROS2‑style ergonomics)#215

Open
griffinmilsap wants to merge 3 commits intodevfrom
feature/sync-lowlevel-api
Open

Feature: Synchronous Low‑Level API for ezmsg (ROS2‑style ergonomics)#215
griffinmilsap wants to merge 3 commits intodevfrom
feature/sync-lowlevel-api

Conversation

@griffinmilsap
Copy link
Collaborator

Summary

  • Adds a synchronous low‑level API (SyncContext, SyncPublisher, SyncSubscriber, init, spin, spin_once) so users can publish/subscribe without asyncio.
  • Adds ROS2‑style usage examples for both sync and async APIs.
  • Improves GraphServer auto‑start control via GraphContext(auto_start=...) and GraphService.ensure(auto_start=...).
  • Adds tests for the sync wrapper and a small perf script to quantify overhead.

Motivation

Many users find asyncio intimidating; the goal is to let them use ezmsg with a simple, synchronous API similar to ROS2 (with ez.sync.init(...), spin(), spin_once()), while preserving ezmsg’s backpressure semantics and zero‑copy safety.
This wrapper is explicitly for ergonomics, not peak throughput.

Implementation Details

New Sync API (src/ezmsg/core/sync.py)

  • SyncContext wraps GraphContext and runs an asyncio event loop in a background thread using new_threaded_event_loop.
  • create_publisher / create_subscription call the underlying async API via asyncio.run_coroutine_threadsafe.
  • spin() / spin_once() pull messages directly via recv_zero_copy() and only release backpressure after the user callback returns.
    • This preserves backpressure and avoids the prior “queueing” behavior that could release SHM‑backed messages too early.
  • spin_once() returns a boolean for “did work”.
  • Handles CacheMiss gracefully when a publisher exits under backpressure (stale notifications).

GraphServer Auto‑Start Control

  • GraphService.ensure(auto_start: bool | None = None)
    • None preserves existing “auto‑start only when address is not specified + no env override.”
    • True / False overrides.
  • GraphContext(..., auto_start=...) passes through to GraphService.ensure.
  • ez.sync.init(..., auto_start=...) mirrors GraphContext defaulting to None.

Examples

  • examples/simple_publisher.py
import time
import ezmsg.core as ez

TOPIC = "/TEST"

def main(host: str = "127.0.0.1", port: int = 12345) -> None:
    with ez.sync.init((host, port), auto_start=True) as ctx:
        pub = ctx.create_publisher(TOPIC, force_tcp=True)

        print("Publisher Task Launched")
        count = 0
        try:
            while True:
                output = f"{count=}"
                pub.publish(output)
                print(output)
                time.sleep(0.1)
                count += 1
        except KeyboardInterrupt:
            pass
        print("Publisher Task Concluded")

    print("Done")
 
if __name__ == '__main__':
    main()
  • examples/simple_subscriber.py
import time
import ezmsg.core as ez

TOPIC = "/TEST"

def main(host: str = "127.0.0.1", port: int = 12345) -> None:
    with ez.sync.init((host, port), auto_start=True) as ctx:
        print("Subscriber Task Launched")

        def on_message(msg: str) -> None:
            # Uncomment if you want to witness backpressure!
            # time.sleep(1.0)
            print(msg)

        ctx.create_subscription(TOPIC, callback=on_message)
        ez.sync.spin(ctx)

    print("Subscriber Task Concluded")

if __name__ == '__main__':
    main()
  • examples/simple_async_publisher.py
  • examples/simple_async_subscriber.py

Tests

  • tests/test_sync_api.py

Perf Script

  • tests/perf_sync_overhead.py

Threads and Concurrency Model

  • The sync wrapper runs one background asyncio loop thread for all async work.
  • The calling thread runs user code (spin, spin_once, callbacks).
  • Every publish() / recv() call crosses threads via run_coroutine_threadsafe.
    • This is safe and preserves backpressure, but introduces measurable overhead.

Performance

Measured using tests/perf_sync_overhead.py (local macOS example):

async: 0.0520s total, 5.20 us/msg, 192,203 msg/s
sync : 1.1506s total, 115.06 us/msg, 8,691 msg/s
overhead: 2111.5%

Interpretation:

  • Sync wrapper is significantly slower for micro‑messages due to per‑message thread hops.
  • The sync API is intended for ergonomics, not maximum throughput.
  • For high‑rate paths, the async API remains recommended.

Files Changed / Added

  • Added src/ezmsg/core/sync.py
  • Updated src/ezmsg/core/__init__.py
  • Updated src/ezmsg/core/graphcontext.py
  • Updated src/ezmsg/core/graphserver.py
  • Added examples/simple_publisher.py
  • Added examples/simple_subscriber.py
  • Added examples/simple_async_publisher.py
  • Added examples/simple_async_subscriber.py
  • Added tests/test_sync_api.py
  • Added tests/perf_sync_overhead.py
  • Removed examples/lowlevel.py (replaced by split examples)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant