From 0ff7ca61738d4c87a3661d7de6eb19be48b9d7ec Mon Sep 17 00:00:00 2001 From: Kevin Welsh Date: Mon, 28 Apr 2025 10:25:40 -0400 Subject: [PATCH 1/3] Create ConcurrentProxy that automatically establishes connections in new threads. --- Pyro5/api.py | 4 ++-- Pyro5/client.py | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/Pyro5/api.py b/Pyro5/api.py index eaef1aa..51665e5 100644 --- a/Pyro5/api.py +++ b/Pyro5/api.py @@ -11,7 +11,7 @@ from . import __version__ from .configure import global_config as config from .core import URI, locate_ns, resolve, type_meta -from .client import Proxy, BatchProxy, SerializedBlob +from .client import Proxy, BatchProxy, ConcurrentProxy, SerializedBlob from .server import Daemon, DaemonObject, callback, expose, behavior, oneway, serve from .nameserver import start_ns, start_ns_loop from .serializers import SerializerBase @@ -24,7 +24,7 @@ __all__ = ["config", "URI", "locate_ns", "resolve", "type_meta", "current_context", - "Proxy", "BatchProxy", "SerializedBlob", "SerializerBase", + "Proxy", "BatchProxy", "ConcurrentProxy", "SerializedBlob", "SerializerBase", "Daemon", "DaemonObject", "callback", "expose", "behavior", "oneway", "start_ns", "start_ns_loop", "serve", "register_dict_to_class", "register_class_to_dict", "unregister_dict_to_class", "unregister_class_to_dict"] diff --git a/Pyro5/client.py b/Pyro5/client.py index ee0e599..bc1b748 100644 --- a/Pyro5/client.py +++ b/Pyro5/client.py @@ -9,6 +9,8 @@ import logging import serpent import contextlib +from typing import Any +from collections import defaultdict from . import config, core, serializers, protocol, errors, socketutil from .callcontext import current_context try: @@ -626,6 +628,44 @@ def _pyroInvoke(self, name, args, kwargs): results = self.__proxy._pyroInvokeBatch(self.__calls) self.__calls = [] # clear for re-use return self.__resultsgenerator(results) + + +class ConcurrentProxy(Proxy): + """ + Proxy for remote python objects. The `Proxy` must be explicitly passed across threads. This class handles automatically + creating new proxies for the current thread when necessary. + """ + + OBJECT_THREAD_MAP: dict[core.URI, dict[Any, "ConcurrentProxy"]] = defaultdict(dict) + + def __init__(self, uri: str, **kwargs): + super().__init__(uri, **kwargs) + ConcurrentProxy.OBJECT_THREAD_MAP[self._pyroUri][get_ident()] = self + + def _pyroInvoke(self, methodname, vargs, kwargs, flags=0, objectId=None): + proxy = self + if self._pyroUri in ConcurrentProxy.OBJECT_THREAD_MAP: + thread_map = ConcurrentProxy.OBJECT_THREAD_MAP[self._pyroUri] + thread_id = get_ident() + + # Copy this proxy for the current thread if it doesn't + if thread_id not in thread_map: + thread_map[thread_id] = self.__copy__() + + proxy = thread_map[thread_id] + + return Proxy._pyroInvoke(proxy, methodname, vargs, kwargs, flags, objectId) + + def __del__(self): + # All threads hold a referenece to this object. So this should only be called when all + # proxies for all threads are garbage collected. + try: + ConcurrentProxy.OBJECT_THREAD_MAP.pop(self._pyroUri, None) + super().__del__() + except Exception: + pass + + class SerializedBlob(object): From 139bf383ac6b6d6195517119ec77cbe7de147ac0 Mon Sep 17 00:00:00 2001 From: Kevin Welsh Date: Mon, 28 Apr 2025 10:26:52 -0400 Subject: [PATCH 2/3] Create a Functor class that can be used to wrap single functions without a class context. --- Pyro5/api.py | 4 ++-- Pyro5/client.py | 1 + Pyro5/server.py | 24 ++++++++++++++++++++++-- tests/test_server.py | 1 - 4 files changed, 25 insertions(+), 5 deletions(-) diff --git a/Pyro5/api.py b/Pyro5/api.py index 51665e5..cb5dd61 100644 --- a/Pyro5/api.py +++ b/Pyro5/api.py @@ -12,7 +12,7 @@ from .configure import global_config as config from .core import URI, locate_ns, resolve, type_meta from .client import Proxy, BatchProxy, ConcurrentProxy, SerializedBlob -from .server import Daemon, DaemonObject, callback, expose, behavior, oneway, serve +from .server import Daemon, DaemonObject, callback, expose, behavior, oneway, serve, Functor from .nameserver import start_ns, start_ns_loop from .serializers import SerializerBase from .callcontext import current_context @@ -25,6 +25,6 @@ __all__ = ["config", "URI", "locate_ns", "resolve", "type_meta", "current_context", "Proxy", "BatchProxy", "ConcurrentProxy", "SerializedBlob", "SerializerBase", - "Daemon", "DaemonObject", "callback", "expose", "behavior", "oneway", + "Daemon", "DaemonObject", "callback", "expose", "behavior", "oneway", "Functor", "start_ns", "start_ns_loop", "serve", "register_dict_to_class", "register_class_to_dict", "unregister_dict_to_class", "unregister_class_to_dict"] diff --git a/Pyro5/client.py b/Pyro5/client.py index bc1b748..0660dbb 100644 --- a/Pyro5/client.py +++ b/Pyro5/client.py @@ -182,6 +182,7 @@ def __dir__(self): # obj.__getitem__(index)), the special methods are not looked up via __getattr__ # for efficiency reasons; instead, their presence is checked directly. # Thus we need to define them here to force (remote) lookup through __getitem__. + def __call__(self, *args, **kwargs): return self.__getattr__('__call__')(*args, **kwargs) def __bool__(self): return True def __len__(self): return self.__getattr__('__len__')() def __getitem__(self, index): return self.__getattr__('__getitem__')(index) diff --git a/Pyro5/server.py b/Pyro5/server.py index 6c91fda..934ea90 100644 --- a/Pyro5/server.py +++ b/Pyro5/server.py @@ -17,7 +17,7 @@ import weakref import serpent import ipaddress -from typing import TypeVar, Tuple, Union, Optional, Dict, Any, Sequence, Set +from typing import Generic, ParamSpec, TypeVar, Tuple, Union, Optional, Dict, Any, Sequence, Set from . import config, core, errors, serializers, socketutil, protocol, client from .callcontext import current_context from collections.abc import Callable @@ -29,7 +29,7 @@ _private_dunder_methods = frozenset([ "__init__", "__init_subclass__", "__class__", "__module__", "__weakref__", - "__call__", "__new__", "__del__", "__repr__", + "__new__", "__del__", "__repr__", "__str__", "__format__", "__nonzero__", "__bool__", "__coerce__", "__cmp__", "__eq__", "__ne__", "__hash__", "__ge__", "__gt__", "__le__", "__lt__", "__dir__", "__enter__", "__exit__", "__copy__", "__deepcopy__", "__sizeof__", @@ -138,6 +138,26 @@ def _behavior(clazz): return _behavior +ReturnType = TypeVar("ReturnType") +ParamsTypes = ParamSpec("ParamsTypes") + +@expose +class Functor(Generic[ParamsTypes, ReturnType]): + """ + A functor is a callable object that can be used as a function. + This is used to wrap functions that are not methods of a class. + """ + + def __init__(self, func: Callable[ParamsTypes, ReturnType]): + self.func = func + + @expose + def __call__( + self, *args: ParamsTypes.args, **kwargs: ParamsTypes.kwargs + ) -> ReturnType: + return self.func(*args, **kwargs) + + @expose class DaemonObject(object): """The part of the daemon that is exposed as a Pyro object.""" diff --git a/tests/test_server.py b/tests/test_server.py index da79784..ff81a6c 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -1032,7 +1032,6 @@ def testIsPrivateName(self): assert Pyro5.server.is_private_attribute("___p") assert not Pyro5.server.is_private_attribute("__dunder__") # dunder methods should not be private except a list of exceptions as tested below assert Pyro5.server.is_private_attribute("__init__") - assert Pyro5.server.is_private_attribute("__call__") assert Pyro5.server.is_private_attribute("__new__") assert Pyro5.server.is_private_attribute("__del__") assert Pyro5.server.is_private_attribute("__repr__") From d8f69fd4c4a93cfe078a5a7ebb2beee12c9cfda0 Mon Sep 17 00:00:00 2001 From: Kevin Welsh Date: Mon, 28 Apr 2025 11:11:59 -0400 Subject: [PATCH 3/3] Switch to threadlocal instead of a dictionary --- Pyro5/client.py | 39 +++++++++------------------------------ 1 file changed, 9 insertions(+), 30 deletions(-) diff --git a/Pyro5/client.py b/Pyro5/client.py index 0660dbb..1333031 100644 --- a/Pyro5/client.py +++ b/Pyro5/client.py @@ -9,7 +9,7 @@ import logging import serpent import contextlib -from typing import Any +from threading import local from collections import defaultdict from . import config, core, serializers, protocol, errors, socketutil from .callcontext import current_context @@ -630,44 +630,23 @@ def _pyroInvoke(self, name, args, kwargs): self.__calls = [] # clear for re-use return self.__resultsgenerator(results) - class ConcurrentProxy(Proxy): """ Proxy for remote python objects. The `Proxy` must be explicitly passed across threads. This class handles automatically creating new proxies for the current thread when necessary. """ - - OBJECT_THREAD_MAP: dict[core.URI, dict[Any, "ConcurrentProxy"]] = defaultdict(dict) - + THREAD_PROXY_MAP = defaultdict(local) def __init__(self, uri: str, **kwargs): super().__init__(uri, **kwargs) - ConcurrentProxy.OBJECT_THREAD_MAP[self._pyroUri][get_ident()] = self + ConcurrentProxy.THREAD_PROXY_MAP[self._pyroUri].proxy = self def _pyroInvoke(self, methodname, vargs, kwargs, flags=0, objectId=None): - proxy = self - if self._pyroUri in ConcurrentProxy.OBJECT_THREAD_MAP: - thread_map = ConcurrentProxy.OBJECT_THREAD_MAP[self._pyroUri] - thread_id = get_ident() - - # Copy this proxy for the current thread if it doesn't - if thread_id not in thread_map: - thread_map[thread_id] = self.__copy__() - - proxy = thread_map[thread_id] - - return Proxy._pyroInvoke(proxy, methodname, vargs, kwargs, flags, objectId) - - def __del__(self): - # All threads hold a referenece to this object. So this should only be called when all - # proxies for all threads are garbage collected. - try: - ConcurrentProxy.OBJECT_THREAD_MAP.pop(self._pyroUri, None) - super().__del__() - except Exception: - pass - - - + local_data = ConcurrentProxy.THREAD_PROXY_MAP[self._pyroUri] + if not hasattr(local_data, "proxy"): + local_data.proxy = self.__copy__() + return Proxy._pyroInvoke( + local_data.proxy, methodname, vargs, kwargs, flags, objectId + ) class SerializedBlob(object): """