Source code for caniusethat.thing

import types
from threading import Condition
from typing import Any, Callable, List

import zmq
from zmq.utils.win32 import allow_interrupt

from caniusethat._logging import getLogger
from caniusethat._types import SharedMethodDescriptor
from caniusethat.rpc_utils import prepare_rpc_pickle, validate_rpc_response

_logger = getLogger(__name__)


[docs]class Thing: """A representation of a remote object, or `thing`, that has methods that can be called. Attributes: name: The unique name of the remote object. server_address: The address of the server that is hosting the remote object. Example: >>> from caniusethat import thing >>> my_thing = thing.Thing("remote_calculator", "tcp://127.0.0.1:6555") >>> my_thing.add(2, 3) 5 """ _RESERVED_NAMES = ["available_methods", "close_this_thing"] _LINGER_TIME = 1000 # ms def __init__(self, name: str, server_address: str) -> None: self.name = name self._rpc_condition = Condition() _logger.info(f"Connecting to 👀 caniusethat server at {server_address}...") context = zmq.Context.instance() self.request_socket: zmq.Socket = context.socket(zmq.REQ) self.request_socket.connect(server_address) self.poller = zmq.Poller() self.poller.register(self.request_socket, zmq.POLLIN) self._methods = self._get_object_description_from_server() self._populate_methods_from_description() self._closed = False def _make_method_fn(self, name: str) -> Callable: return lambda _self, *args, **kwargs: self._make_rpc_and_validate_response( _self.name, name, *args, **kwargs ) def _socket_receive(self) -> bytes: """Receives a message from the server.""" with allow_interrupt(self.close_this_thing): socks = dict(self.poller.poll()) if not ( self.request_socket in socks and socks[self.request_socket] == zmq.POLLIN ): raise RuntimeError( f"Poller returned incorrect socket or event: {socks}" ) return self.request_socket.recv() def _make_rpc_and_validate_response( self, name: str, method: str, *args, **kwargs ) -> Any: rpc_pickle = prepare_rpc_pickle(name, method, args, kwargs) with self._rpc_condition: self.request_socket.send(rpc_pickle) socket_response = self._socket_receive() return validate_rpc_response(socket_response) def _get_object_description_from_server(self) -> List[SharedMethodDescriptor]: """Gets the description of the remote object from the server.""" object_description = self._make_rpc_and_validate_response( "_server", "get_object_methods", self.name ) if not isinstance(object_description, list): raise RuntimeError( f"Received invalid RemoteProcedureResponse: {object_description}" ) for method_descriptor in object_description: if not isinstance(method_descriptor, SharedMethodDescriptor): raise RuntimeError( f"Received invalid RemoteProcedureResponse: {object_description}" ) return object_description def _populate_methods_from_description(self) -> None: """Populates the methods of this object from the description of the remote object.""" for name, signature, docstring in self._methods: if name in self._RESERVED_NAMES: raise RuntimeError( f"Method name `{name}` is reserved for internal use, please change it in the remote class." ) _logger.debug(f"Adding method {name}({signature})") method_fn = self._make_method_fn(name) method_fn.__name__ = name method_fn.__signature__ = signature # type: ignore method_fn.__doc__ = signature + "\n" + docstring setattr(self, name, types.MethodType(method_fn, self))
[docs] def available_methods(self) -> List[SharedMethodDescriptor]: """Returns a list of the available methods of this object.""" return self._methods
[docs] def close_this_thing(self) -> None: """Closes the connection to the remote object and server, releasing any locks if any are still held.""" if not self._closed: _logger.info("Closing connection to 👀 caniusethat server") try: _ = self._make_rpc_and_validate_response( "_server", "release_lock_if_any", self.name ) except Exception: _logger.exception( "There was an error when trying to remove locks on the Thing." ) self.poller.unregister(self.request_socket) self.request_socket.close(linger=self._LINGER_TIME) self._closed = True else: RuntimeError("Connection to 👀 caniusethat server already closed.")