Source code for compactor.process

import functools

from .context import Context
from .pid import PID


[docs]class Process(object): class Error(Exception): pass class UnboundProcess(Error): pass ROUTE_ATTRIBUTE = '__route__' INSTALL_ATTRIBUTE = '__mailbox__' @classmethod
[docs] def route(cls, path): """A decorator to indicate that a method should be a routable HTTP endpoint. .. code-block:: python from compactor.process import Process class WebProcess(Process): @Process.route('/hello/world') def hello_world(self, handler): return handler.write('<html><title>hello world</title></html>') The handler passed to the method is a tornado RequestHandler. WARNING: This interface is alpha and may change in the future if or when we remove tornado as a compactor dependency. :param path: The endpoint to route to this method. :type path: ``str`` """ if not path.startswith('/'): raise ValueError('Routes must start with "/"') def wrap(fn): setattr(fn, cls.ROUTE_ATTRIBUTE, path) return fn return wrap # TODO(wickman) Make mbox optional, defaulting to function.__name__. # TODO(wickman) Make INSTALL_ATTRIBUTE a defaultdict(list) so that we can # route multiple endpoints to a single method.
@classmethod
[docs] def install(cls, mbox): """A decorator to indicate a remotely callable method on a process. .. code-block:: python from compactor.process import Process class PingProcess(Process): @Process.install('ping') def ping(self, from_pid, body): # do something The installed method should take ``from_pid`` and ``body`` parameters. ``from_pid`` is the process calling the method. ``body`` is a ``bytes`` stream that was delivered with the message, possibly empty. :param mbox: Incoming messages to this "mailbox" will be dispatched to this method. :type mbox: ``str`` """ def wrap(fn): setattr(fn, cls.INSTALL_ATTRIBUTE, mbox) return fn return wrap
[docs] def __init__(self, name): """Create a process with a given name. The process must still be bound to a context before it can send messages or link to other processes. :param name: The name of this process. :type name: ``str`` """ self.name = name self._delegates = {} self._http_handlers = dict(self.iter_routes()) self._message_handlers = dict(self.iter_handlers()) self._context = None
def __iter_callables(self): # iterate over the methods in a way where we can differentiate methods from descriptors for method in type(self).__dict__.values(): if callable(method): # 'method' is the unbound method on the class -- we want to return the bound instancemethod yield getattr(self, method.__name__) def iter_routes(self): for function in self.__iter_callables(): if hasattr(function, self.ROUTE_ATTRIBUTE): yield getattr(function, self.ROUTE_ATTRIBUTE), function def iter_handlers(self): for function in self.__iter_callables(): if hasattr(function, self.INSTALL_ATTRIBUTE): yield getattr(function, self.INSTALL_ATTRIBUTE), function def _assert_bound(self): if not self._context: raise self.UnboundProcess('Cannot get pid of unbound process.') def bind(self, context): if not isinstance(context, Context): raise TypeError('Can only bind to a Context, got %s' % type(context)) self._context = context @property
[docs] def pid(self): """The pid of this process. :raises: Will raise a ``Process.UnboundProcess`` exception if the process is not bound to a context. """ self._assert_bound() return PID(self._context.ip, self._context.port, self.name)
@property
[docs] def context(self): """The context that this process is bound to. :raises: Will raise a ``Process.UnboundProcess`` exception if the process is not bound to a context. """ self._assert_bound() return self._context
@property def route_paths(self): return self._http_handlers.keys() @property def message_names(self): return self._message_handlers.keys() def delegate(self, name, pid): self._delegates[name] = pid def handle_message(self, name, from_pid, body): if name in self._message_handlers: self._message_handlers[name](from_pid, body) elif name in self._delegates: to = self._delegates[name] self._context.transport(to, name, body, from_pid) def handle_http(self, route, handler, *args, **kw): return self._http_handlers[route](handler, *args, **kw)
[docs] def initialize(self): """Called when this process is spawned. Once this is called, it means a process is now routable. Subclasses should implement this to initialize state or possibly initiate connections to remote processes. """
[docs] def exited(self, pid): """Called when a linked process terminates or its connection is severed. :param pid: The pid of the linked process. :type pid: :class:`PID` """
[docs] def send(self, to, method, body=None): """Send a message to another process. Sending messages is done asynchronously and is not guaranteed to succeed. Returns immediately. :param to: The pid of the process to send a message. :type to: :class:`PID` :param method: The method/mailbox name of the remote method. :type method: ``str`` :keyword body: The optional content to send with the message. :type body: ``bytes`` or None :raises: Will raise a ``Process.UnboundProcess`` exception if the process is not bound to a context. :return: Nothing """ self._assert_bound() self._context.send(self.pid, to, method, body)
[docs] def terminate(self): """Terminate this process. This unbinds it from the context to which it is bound. :raises: Will raise a ``Process.UnboundProcess`` exception if the process is not bound to a context. """ self._assert_bound() self._context.terminate(self.pid)
[docs]class ProtobufProcess(Process): @classmethod
[docs] def install(cls, message_type): """A decorator to indicate a remotely callable method on a process using protocol buffers. .. code-block:: python from compactor.process import ProtobufProcess from messages_pb2 import RequestMessage, ResponseMessage class PingProcess(ProtobufProcess): @ProtobufProcess.install(RequestMessage) def ping(self, from_pid, message): # do something with message, a RequestMessage response = ResponseMessage(...) # send a protocol buffer which will get serialized on the wire. self.send(from_pid, response) The installed method should take ``from_pid`` and ``message`` parameters. ``from_pid`` is the process calling the method. ``message`` is a protocol buffer of the installed type. :param message_type: Incoming messages to this message_type will be dispatched to this method. :type message_type: A generated protocol buffer stub """ def wrap(fn): @functools.wraps(fn) def wrapped_fn(self, from_pid, message_str): message = message_type() message.MergeFromString(message_str) return fn(self, from_pid, message) return Process.install(message_type.DESCRIPTOR.full_name)(wrapped_fn) return wrap
[docs] def send(self, to, message): """Send a message to another process. Same as ``Process.send`` except that ``message`` is a protocol buffer. Returns immediately. :param to: The pid of the process to send a message. :type to: :class:`PID` :param message: The message to send :type method: A protocol buffer instance. :raises: Will raise a ``Process.UnboundProcess`` exception if the process is not bound to a context. :return: Nothing """ super(ProtobufProcess, self).send(to, message.DESCRIPTOR.full_name, message.SerializeToString())