Source code for compactor.context

"""Context controls the routing and handling of messages between processes."""

import logging
import socket
import threading
import os
try:
  import asyncio
except ImportError:
  import trollius as asyncio

from collections import defaultdict
from functools import partial

from .httpd import HTTPD
from .request import encode_request

from tornado import stack_context
from tornado.iostream import IOStream
from tornado.netutil import bind_sockets
from tornado.platform.asyncio import BaseAsyncIOLoop

log = logging.getLogger(__name__)


[docs]class Context(threading.Thread): """A compactor context. Compactor contexts control the routing and handling of messages between processes. At its most basic level, a context is a listening (ip, port) pair and an event loop. """ class Error(Exception): pass class SocketError(Error): pass class InvalidProcess(Error): pass class InvalidMethod(Error): pass _SINGLETON = None _LOCK = threading.Lock() CONNECT_TIMEOUT_SECS = 5 @classmethod def _make_socket(cls, ip, port): """Bind to a new socket. If LIBPROCESS_PORT or LIBPROCESS_IP are configured in the environment, these will be used for socket connectivity. """ bound_socket = bind_sockets(port, address=ip)[0] ip, port = bound_socket.getsockname() if not ip or ip == '0.0.0.0': ip = socket.gethostbyname(socket.gethostname()) return bound_socket, ip, port @classmethod def get_ip_port(cls, ip=None, port=None): ip = ip or os.environ.get('LIBPROCESS_IP', '0.0.0.0') try: port = int(port or os.environ.get('LIBPROCESS_PORT', 0)) except ValueError: raise cls.Error('Invalid ip/port provided') return ip, port @classmethod def singleton(cls, delegate='', **kw): with cls._LOCK: if cls._SINGLETON: if cls._SINGLETON.delegate != delegate: raise RuntimeError('Attempting to construct different singleton context.') else: cls._SINGLETON = cls(delegate=delegate, **kw) cls._SINGLETON.start() return cls._SINGLETON
[docs] def __init__(self, delegate='', loop=None, ip=None, port=None): """Construct a compactor context. Before any useful work can be done with a context, you must call ``start`` on the context. :keyword ip: The ip port of the interface on which the Context should listen. If none is specified, the context will attempt to bind to the ip specified by the ``LIBPROCESS_IP`` environment variable. If this variable is not set, it will bind on all interfaces. :type ip: ``str`` or None :keyword port: The port on which the Context should listen. If none is specified, the context will attempt to bind to the port specified by the ``LIBPROCESS_PORT`` environment variable. If this variable is not set, it will bind to an ephemeral port. :type port: ``int`` or None """ self._processes = {} self._links = defaultdict(set) self.delegate = delegate self.__loop = self.http = None self.__event_loop = loop self._ip = None ip, port = self.get_ip_port(ip, port) self.__sock, self.ip, self.port = self._make_socket(ip, port) self._connections = {} self._connection_callbacks = defaultdict(list) self._connection_callbacks_lock = threading.Lock() self.__context_name = 'CompactorContext(%s:%d)' % (self.ip, self.port) super(Context, self).__init__(name=self.__context_name) self.daemon = True self.lock = threading.Lock() self.__id = 1 self.__loop_started = threading.Event()
def _assert_started(self): assert self.__loop_started.is_set()
[docs] def start(self): """Start the context. This method must be called before calls to ``send`` and ``spawn``. This method is non-blocking. """ super(Context, self).start() self.__loop_started.wait()
def __debug(self, msg): log.debug('%s: %s' % (self.__context_name, msg)) def run(self): # The entry point of the Context thread. This should not be called directly. loop = self.__event_loop or asyncio.new_event_loop() class CustomIOLoop(BaseAsyncIOLoop): def initialize(self): super(CustomIOLoop, self).initialize(loop, close_loop=False) self.__loop = CustomIOLoop() self.http = HTTPD(self.__sock, self.__loop) self.__loop_started.set() self.__loop.start() self.__loop.close() def _is_local(self, pid): return pid in self._processes def _assert_local_pid(self, pid): if not self._is_local(pid): raise self.InvalidProcess('Operation only valid for local processes!')
[docs] def stop(self): """Stops the context. This terminates all PIDs and closes all connections.""" log.info('Stopping %s' % self) pids = list(self._processes) # Clean up the context for pid in pids: self.terminate(pid) while self._connections: pid = next(iter(self._connections)) conn = self._connections.pop(pid, None) if conn: conn.close() self.__loop.stop()
[docs] def spawn(self, process): """Spawn a process. Spawning a process binds it to this context and assigns the process a pid which is returned. The process' ``initialize`` method is called. Note: A process cannot send messages until it is bound to a context. :param process: The process to bind to this context. :type process: :class:`Process` :return: The pid of the process. :rtype: :class:`PID` """ self._assert_started() process.bind(self) self.http.mount_process(process) self._processes[process.pid] = process process.initialize() return process.pid
def _get_dispatch_method(self, pid, method): try: return getattr(self._processes[pid], method) except KeyError: raise self.InvalidProcess('Unknown process %s' % pid) except AttributeError: raise self.InvalidMethod('Unknown method %s on %s' % (method, pid))
[docs] def dispatch(self, pid, method, *args): """Call a method on another process by its pid. The method on the other process does not need to be installed with ``Process.install``. The call is serialized with all other calls on the context's event loop. The pid must be bound to this context. This function returns immediately. :param pid: The pid of the process to be called. :type pid: :class:`PID` :param method: The name of the method to be called. :type method: ``str`` :return: Nothing """ self._assert_started() self._assert_local_pid(pid) function = self._get_dispatch_method(pid, method) self.__loop.add_callback(function, *args)
[docs] def delay(self, amount, pid, method, *args): """Call a method on another process after a specified delay. This is equivalent to ``dispatch`` except with an additional amount of time to wait prior to invoking the call. This function returns immediately. :param amount: The amount of time to wait in seconds before making the call. :type amount: ``float`` or ``int`` :param pid: The pid of the process to be called. :type pid: :class:`PID` :param method: The name of the method to be called. :type method: ``str`` :return: Nothing """ self._assert_started() self._assert_local_pid(pid) function = self._get_dispatch_method(pid, method) self.__loop.add_timeout(self.__loop.time() + amount, function, *args)
def __dispatch_on_connect_callbacks(self, to_pid, stream): with self._connection_callbacks_lock: callbacks = self._connection_callbacks.pop(to_pid, []) for callback in callbacks: log.debug('Dispatching connection callback %s for %s:%s -> %s' % ( callback, self.ip, self.port, to_pid)) self.__loop.add_callback(callback, stream) def _maybe_connect(self, to_pid, callback=None): """Asynchronously establish a connection to the remote pid.""" callback = stack_context.wrap(callback or (lambda stream: None)) def streaming_callback(data): # we are not guaranteed to get an acknowledgment, but log and discard bytes if we do. log.info('Received %d bytes from %s, discarding.' % (len(data), to_pid)) log.debug(' data: %r' % (data,)) def on_connect(exit_cb, stream): log.info('Connection to %s established' % to_pid) with self._connection_callbacks_lock: self._connections[to_pid] = stream self.__dispatch_on_connect_callbacks(to_pid, stream) self.__loop.add_callback( stream.read_until_close, exit_cb, streaming_callback=streaming_callback) create = False with self._connection_callbacks_lock: stream = self._connections.get(to_pid) callbacks = self._connection_callbacks.get(to_pid) if not stream: self._connection_callbacks[to_pid].append(callback) if not callbacks: create = True if stream: self.__loop.add_callback(callback, stream) return if not create: return sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) if not sock: raise self.SocketError('Failed opening socket') stream = IOStream(sock, io_loop=self.__loop) stream.set_nodelay(True) stream.set_close_callback(partial(self.__on_exit, to_pid, b'reached end of stream')) connect_callback = partial(on_connect, partial(self.__on_exit, to_pid), stream) log.info('Establishing connection to %s' % to_pid) stream.connect((to_pid.ip, to_pid.port), callback=connect_callback) if stream.closed(): raise self.SocketError('Failed to initiate stream connection') log.info('Maybe connected to %s' % to_pid) def _get_local_mailbox(self, pid, method): for mailbox, callable in self._processes[pid].iter_handlers(): if method == mailbox: return callable
[docs] def send(self, from_pid, to_pid, method, body=None): """Send a message method from one pid to another with an optional body. Note: It is more idiomatic to send directly from a bound process rather than calling send on the context. If the destination pid is on the same context, the Context may skip the wire and route directly to process itself. ``from_pid`` must be bound to this context. This method returns immediately. :param from_pid: The pid of the sending process. :type from_pid: :class:`PID` :param to_pid: The pid of the destination process. :type to_pid: :class:`PID` :param method: The method name of the destination process. :type method: ``str`` :keyword body: Optional content to send along with the message. :type body: ``bytes`` or None :return: Nothing """ self._assert_started() self._assert_local_pid(from_pid) if self._is_local(to_pid): local_method = self._get_local_mailbox(to_pid, method) if local_method: log.info('Doing local dispatch of %s => %s (method: %s)' % (from_pid, to_pid, local_method)) self.__loop.add_callback(local_method, from_pid, body or b'') return else: # TODO(wickman) Consider failing hard if no local method is detected, otherwise we're # just going to do a POST and have it dropped on the floor. pass request_data = encode_request(from_pid, to_pid, method, body=body) log.info('Sending POST %s => %s (payload: %d bytes)' % ( from_pid, to_pid.as_url(method), len(request_data))) def on_connect(stream): log.info('Writing %s from %s to %s' % (len(request_data), from_pid, to_pid)) stream.write(request_data) log.info('Wrote %s from %s to %s' % (len(request_data), from_pid, to_pid)) self.__loop.add_callback(self._maybe_connect, to_pid, on_connect)
def __erase_link(self, to_pid): for pid, links in self._links.items(): try: links.remove(to_pid) log.debug('PID link from %s <- %s exited.' % (pid, to_pid)) self._processes[pid].exited(to_pid) except KeyError: continue def __on_exit(self, to_pid, body): log.info('Disconnected from %s (%s)', to_pid, body) stream = self._connections.pop(to_pid, None) if stream is None: log.error('Received disconnection from %s but no stream found.' % to_pid) self.__erase_link(to_pid)
[docs] def terminate(self, pid): """Terminate a process bound to this context. When a process is terminated, all the processes to which it is linked will be have their ``exited`` methods called. Messages to this process will no longer be delivered. This method returns immediately. :param pid: The pid of the process to terminate. :type pid: :class:`PID` :returns: Nothing """ self._assert_started() log.info('Terminating %s' % pid) process = self._processes.pop(pid, None) if process: log.info('Unmounting %s' % process) self.http.unmount_process(process) self.__erase_link(pid)
def __str__(self): return 'Context(%s:%s)' % (self.ip, self.port)