From 6ac5ee32346ca5c5be3f1516ee50c9f462f35d38 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Fri, 13 Feb 2026 14:25:09 -0500 Subject: [PATCH] zero-copy semantics --- .../explanations/content-explanations.rst | 3 + docs/source/explanations/low-level-api.rst | 5 +- .../transport-messaging-internals.rst | 85 +++++++++++++++++++ docs/source/how-tos/pipeline/unit.rst | 6 +- 4 files changed, 96 insertions(+), 3 deletions(-) create mode 100644 docs/source/explanations/transport-messaging-internals.rst diff --git a/docs/source/explanations/content-explanations.rst b/docs/source/explanations/content-explanations.rst index 2e4e1c1..961d778 100644 --- a/docs/source/explanations/content-explanations.rst +++ b/docs/source/explanations/content-explanations.rst @@ -10,8 +10,11 @@ This section of the documentation aims to provide a comprehensive overview of th ezmsg-design low-level-api + transport-messaging-internals axisarray +.. important:: `ezmsg` delivers subscriber messages with zero-copy semantics in all cases. Treat incoming messages as immutable, and copy data before mutating or republishing. See :doc:`transport-messaging-internals` for details and examples. + Other ways to learn about `ezmsg` include following our :doc:`Tutorial <../tutorials/content-tutorials>`, checking the list of :doc:`HOW TO pages <../how-tos/content-howtos>` and the :doc:`reference documentation <../reference/content-reference>`. diff --git a/docs/source/explanations/low-level-api.rst b/docs/source/explanations/low-level-api.rst index 3162248..34b21c8 100644 --- a/docs/source/explanations/low-level-api.rst +++ b/docs/source/explanations/low-level-api.rst @@ -16,10 +16,11 @@ At its core, `ezmsg` is a publish/subscribe messaging system. - A **publisher** can send messages to **many subscribers**. - A **subscriber** can listen to **many publishers**. - **Channels** route messages between publishers and subscribers; they are **created and managed automatically** when you connect endpoints. -- Messages can be any Python object. `AxisArray` is optional, not required. +- Messages can be any Python object. `AxisArray` is optional, not required (but strongly encouraged when it is a good fit). The low-level API gives you direct access to these primitives. You decide **when** to publish, **how** to receive, and **how** to schedule your own control flow. This makes the low-level API a good fit when you want to integrate messaging into an existing application structure instead of adopting the full `ezmsg` pipeline runtime. +For a detailed breakdown of runtime components, transport selection, backpressure, and zero-copy semantics, see :doc:`transport-messaging-internals`. |ezmsg_logo_small| Relationship to the High-level API ****************************************************** @@ -46,6 +47,8 @@ The **high-level API** is a good fit when: - You benefit from the pipeline tooling (graph visualization, CLI integration, etc.). - You want a structured way to scale across threads/processes without managing it yourself. +.. important:: The low-level API is not any more performant than the high-level API. There is no meaningful performance hit when using the high-level API. + |ezmsg_logo_small| Examples ******************************** diff --git a/docs/source/explanations/transport-messaging-internals.rst b/docs/source/explanations/transport-messaging-internals.rst new file mode 100644 index 0000000..faf644e --- /dev/null +++ b/docs/source/explanations/transport-messaging-internals.rst @@ -0,0 +1,85 @@ +Transport and Messaging Internals +################################# + +This page documents the runtime components and data-transport behavior used by `ezmsg`. These internals apply equally to the low-level and high-level APIs. + +GraphServer, Publisher, Subscriber, Channel +=========================================== + +- **GraphServer**: a lightweight TCP service that tracks the topic DAG, keeps a registry of publishers/subscribers, and notifies subscribers when their upstream publishers change. It also brokers shared-memory segment creation and attachment. +- **Publisher**: a client that registers a topic with the GraphServer, opens a channel server (TCP listener), and broadcasts messages. It owns shared-memory buffers and enforces backpressure so buffers are not reused until all subscribers have released a message. +- **Subscriber**: a client that registers a topic with the GraphServer, receives updates that list the upstream publisher IDs it should listen to, and maintains local channels for those publishers. +- **Channel**: a process-local "middle-man" for a single publisher. It receives message telemetry from the publisher (or direct local messages), caches the most recent messages, and notifies all local subscribers in that process. + +Connection Lifecycle (Publisher -> GraphServer -> Subscriber -> Channel) +======================================================================== + +1. A **Publisher** connects to the **GraphServer**, allocates shared-memory buffers, and registers its topic. It starts a small TCP server so Channels can connect back to it, then reports that server's address to the GraphServer. +2. A **Subscriber** connects to the **GraphServer** and registers its topic. The GraphServer computes upstream publishers from the topic DAG and sends the Subscriber an `UPDATE` list of publisher IDs. +3. For each publisher ID, the Subscriber asks the local **ChannelManager** to register a **Channel**. If a Channel does not yet exist in this process, it is created by: +4. The **Channel** requesting a channel allocation from the GraphServer for the given publisher ID. The GraphServer returns a new channel ID plus the publisher's server address. +5. The **Channel** connects to the Publisher's channel server, receives the topic and shared-memory name, and attempts to attach to shared memory. It reports whether SHM attach succeeded plus its process ID. +6. The **Publisher** completes the handshake by returning the configured number of buffers. The Channel is now ready to receive messages and notify local subscribers. +7. When graph topology changes (connect/disconnect or new publishers), the GraphServer sends new `UPDATE` messages and Subscribers add or remove Channels as needed. + +Transport Selection: Local, SHM, TCP +==================================== + +`ezmsg` uses the fastest transport available per Publisher/Channel pair: + +- **Local transport (same process)**: the Publisher pushes the object directly into the Channel (`put_local`), and the Channel stores it in the `MessageCache` without serialization. This is the lowest-overhead path. +- **Shared memory (different process, SHM OK)**: the Publisher serializes the object using `MessageMarshal` (pickle protocol 5 with buffer support), writes it into a ring of shared-memory buffers, and notifies the Channel with a `TX_SHM` message. The Channel reads from shared memory using the message ID and caches the deserialized object. +- **TCP (fallback or forced)**: if SHM is unavailable (attach failed, remote host) or `force_tcp=True`, the Publisher sends a `TX_TCP` payload (header + serialized buffers) directly over the channel socket. The Channel deserializes it and caches the result. + +MessageCache and Deserialization Overhead +========================================= + +Every Channel maintains a fixed-size `MessageCache` (one slot per shared-memory buffer). This cache is the key to reducing deserialization overhead: + +- For SHM and TCP, the Channel **deserializes each message once per process** and stores the object in the cache. +- All Subscribers in that process read the same cached object -- they do not deep copy. +- When all local Subscribers have released a message, the Channel frees the cache entry and acknowledges the Publisher so the buffer can be reused. + +This design keeps serialization/deserialization out of the hot path for local delivery and prevents repeated deserialization when multiple Subscribers in the same process read the same message. + +Publisher-Channel Buffers and Backpressure +========================================== + +Every Publisher (and thus every Channel) is configured with a fixed number of buffers (`num_buffers`, default 32). These buffers define a ring: + +- Each message gets a monotonically increasing `msg_id`. +- The buffer index is `msg_id % num_buffers`. +- The Channel maintains a `MessageCache` with the same size, so each buffer index maps to one cache slot. + +Backpressure is the mechanism that prevents a buffer slot from being overwritten while any subscriber still needs its message: + +- The **Publisher** checks whether the next buffer index is free. If not, it waits until all leases for that buffer are released. +- When the **Channel** notifies local subscribers of a new message, it **leases** that buffer index for each subscriber. This records "who still needs this message." +- When a subscriber finishes reading the message (or drops it in leaky mode), the Channel **releases** that subscriber’s lease. +- Once all leases for that buffer index are released, the Channel clears the cache entry and acknowledges the Publisher so it can reuse the slot. + +Backpressure works the same way for local, SHM, and TCP delivery. The transport only affects how the Channel receives the message bytes; buffer ownership and release are always enforced by the same lease/ack mechanism. + +Zero-copy Semantics and Message Ownership +========================================= + +Publishers in `ezmsg` serialize messages to shared memory, and eventually to a module-scoped **MessageCache** which is shared by several Subscribers. Subscribers receive a "zero-copy" view of this message that is: + +- The originally published object itself in the case local transport was used. +- Backed by Publisher-controlled shared memory if SHM transport was used. +- A shared/cached version of the deserialized object if TCP transport was used. + +This results in very low messaging overhead and high messaging performance with **some important safety considerations.** + +.. important:: Treat incoming messages as **immutable**. If you need to modify data or republish it, do **not** modify data in place without a very strong understanding of the implications. Generally, you should **create a new message or copy the payload first**. Do **not** republish the same object instance you received. + +Why this matters (two concrete failure modes): + +1. **Mutating a locally cached message affects other subscribers.** Example: Publisher A and Subscribers B/C/D are in the same process. If Subscriber B mutates the message it received, Subscribers C and D will see those changes because they are reading the same cached object. +2. **Republishing a shared-memory-backed message can corrupt downstream readers.** Example: Publisher A (Process X) publishes an `AxisArray` into shared memory, Subscriber B (Process Y) receives it, then republishes *the same object* via Publisher C to Subscriber D (also in Process Y). Because Publisher C and Subscriber D are local, the object is passed via the local cache. Meanwhile Publisher A sees the shared-memory buffer as free (because Subscriber B has acknowledged receipt and backpressure has been released) and overwrites it, so Subscriber D now observes mutated data. + +In the low-level API, subscriber messages are received via `Subscriber.recv()` or `Subscriber.recv_zero_copy()`. `recv()` wraps `recv_zero_copy()` and returns a deep copy of the message; `recv_zero_copy()` yields the shared cached object. + +**The high-level `ezmsg` API uses zero-copy semantics for subscriber delivery in all cases. This has been the effective behavior since 2023 (V3.2.0+) and is now the documented behavior.** This specifically applies to coroutines wrapped with the high-level `@ez.subscriber` decorator (the `message` you receive **is that shared object**). + +.. note:: The `zero_copy` argument (default `False`) on `@ez.subscriber` previously controlled whether a deepcopy was performed before delivering the message. As detailed in `issue #209 `_, a backend change in early 2023 (V3.2.0+) resulted in this argument being ignored, so all coroutines decorated with `@ez.subscriber` receive zero-copy input messages. The maintainers have decided this is preferable behavior because it optimizes message throughput at the expense of code safety. In an upcoming version of `ezmsg`, the `zero_copy` keyword argument will be formally deprecated/retired since it no longer has any effect. If you would like to restore the safety previously guaranteed by `zero_copy=False`, add `message = deepcopy(message)` as the first line in your coroutine. diff --git a/docs/source/how-tos/pipeline/unit.rst b/docs/source/how-tos/pipeline/unit.rst index 3fc2a23..4c10544 100644 --- a/docs/source/how-tos/pipeline/unit.rst +++ b/docs/source/how-tos/pipeline/unit.rst @@ -177,10 +177,12 @@ There are a few important notes to remember when implementing these Unit methods - Each method must be asynchronous to allow for non-blocking message processing. This means that each method must be defined with the ``async def`` keywords, not ``def``. - Each method should be decorated with the appropriate decorators (see :ref:`decorators`). The most important decorators are ``@ez.subscriber`` and ``@ez.publisher``, which are used to subscribe to input streams and publish to output streams, respectively. These two decorators should take the relevant streams as arguments. - Each publishing method must use ``yield`` to produce output messages. The syntax of the yield statement should be ``yield self.OUTPUT_STREAM, MessageType(...)``, where ``self.OUTPUT_STREAM`` is the output stream you are publishing to, and ``MessageType(...)`` is the message you are sending. -- Each subscribing method must take in a message parameter which will receive the incoming message from the input stream. The method signature should be ``async def method_name(self, message)``. Additionally, in the ``ez.subscriber`` decorator, you can specify the keyword boolean argument ``zero_copy`` to indicate whether you want to receive a zero-copy reference (``zero_copy=True``) to the message (if supported by the message type) or a copy of the message. The default is ``zero_copy=False``. +- Each subscribing method must take in a message parameter which will receive the incoming message from the input stream. The method signature should be ``async def method_name(self, message)``. - If a method is to stop processing and terminate normally, it should raise the ``ez.NormalTermination`` exception. This indicates to ezmsg that the Unit has completed its task and can be safely terminated. - There are other decorators available for other purposes. See :ref:`decorators` for more details. Note, one can stack decorators. +.. important:: For performance reasons, ezmsg delivers subscriber messages with zero-copy semantics in all cases. **Treat the incoming message as immutable; if you need to modify or republish it, copy first.** You may come across code with a ``zero-copy = True`` keyword argument specified in the ``@ez.subscriber`` decorator, which is now ignored. See :doc:`../../explanations/transport-messaging-internals` for details. + With these components discussed, we can see the example from this question again: .. code-block:: python @@ -215,4 +217,4 @@ When a Unit is initialised within a pipeline, ezmsg takes care of setting up the .. |ezmsg_logo_small| image:: ../../_static/_images/ezmsg_logo.png :width: 40 - :alt: ezmsg logo \ No newline at end of file + :alt: ezmsg logo