Source code for pyesys.event

import threading
import inspect
import asyncio
import weakref
import gc
from typing import Callable, List, Tuple, Optional, Union, Iterable, Coroutine, Any
from concurrent.futures import Future

from .handler import EventHandler, ErrorHandler, default_error_handler, P


[docs] class Event: """ A thread-safe event dispatcher with comprehensive features: - Weak-reference support for bound methods (automatic cleanup) - Runtime signature checking via example function - Configurable error handling with consistent behavior - Introspection: handler_count, handlers list - Duplicate subscription control - Mixed sync/async support with proper resource management - Performance optimizations with lazy cleanup Generic P: parameter specification for handler arguments. """
[docs] class Listener: """ Provides a restricted interface to subscribe/unsubscribe handlers to the outer Event without exposing emission or clearing capabilities. This separation ensures subscribers cannot accidentally trigger events or clear all handlers. """ __slots__ = ("_outer", "_per_instance") def __init__(self, outer: "Event"): """ Initialize a Listener for a given Event. :param outer: The Event instance to subscribe to. """ self._outer = outer self._per_instance = None # Set by EventDescriptor for introspection def __iadd__(self, handler: Union[Callable[P, None], Iterable[Callable[P, None]]]) -> "Event.Listener": """ Subscribe one or more handlers to the Event. Supports single callables or iterables (list, tuple, set). :param handler: Callable or iterable of callables to subscribe. :return: Self for chaining. :raises TypeError: If handler is not callable. """ self._outer += handler return self def __isub__(self, handler: Union[Callable[P, None], Iterable[Callable[P, None]]]) -> "Event.Listener": """ Unsubscribe one or more handlers from the Event. Supports single callables or iterables (list, tuple, set). :param handler: Callable or iterable of callables to unsubscribe. :return: Self for chaining. :raises TypeError: If handler is not callable. """ self._outer -= handler return self
[docs] def subscribe(self, handler: Union[Callable[P, None], Iterable[Callable[P, None]]]) -> None: """ Alternative to += operator for subscribing handlers. Supports single callables or iterables (list, tuple, set). :param handler: Callable or iterable of callables to subscribe. :raises TypeError: If handler is not callable. """ self._outer += handler
[docs] def unsubscribe(self, handler: Union[Callable[P, None], Iterable[Callable[P, None]]]) -> None: """ Alternative to -= operator for unsubscribing handlers. Supports single callables or iterables (list, tuple, set). :param handler: Callable or iterable of callables to unsubscribe. :raises TypeError: If handler is not callable. """ self._outer -= handler
[docs] def handler_count(self) -> int: """ Number of currently alive handlers. :return: Count of active handlers. """ return self._outer.handler_count()
def __init__( self, *, allow_duplicates: bool = True, error_handler: Optional[ErrorHandler] = None, ): """ Initialize an Event with no handlers. :param allow_duplicates: If True, the same handler can be added multiple times. :param error_handler: Custom error handler, or None for default behavior. """ self._handlers: List[EventHandler] = [] self._lock = threading.RLock() # RLock for potential recursive calls self.listener = Event.Listener(self) self._sig: Optional[inspect.Signature] = None self._allow_duplicates = allow_duplicates self._error_handler = error_handler or default_error_handler self._cleanup_counter = 0 # For optimized cleanup @staticmethod def _is_signature_compatible( sig: inspect.Signature, expected_sig: inspect.Signature ) -> bool: """ Determine if two signatures are compatible in parameter count and kinds, ignoring type annotations. :param sig: Signature of a handler to check. :param expected_sig: Expected signature to compare against. :return: True if signatures are compatible. """ sig_params = list(sig.parameters.values()) expected_params = list(expected_sig.parameters.values()) if len(sig_params) != len(expected_params): return False for p, q in zip(sig_params, expected_params): if p.kind != q.kind: return False return True def _cleanup_dead_handlers(self, force: bool = False) -> None: """ Remove dead handlers from the internal list with performance optimization. Uses lazy cleanup strategy to avoid overhead on every operation. Must be called with lock held. :param force: If True, always perform cleanup regardless of counter. """ if not force: self._cleanup_counter += 1 # Perform cleanup every 10 operations, on first operation, or when forced should_cleanup = force or ( self._cleanup_counter == 1 or self._cleanup_counter % 10 == 0 ) if should_cleanup: original_count = len(self._handlers) # Force multiple garbage collection cycles for more reliable cleanup if original_count > 0: for _ in range(3): # Multiple cycles for thorough cleanup gc.collect() # Filter out dead handlers alive_handlers = [] for h in self._handlers: if h.is_alive(): alive_handlers.append(h) self._handlers[:] = alive_handlers # Reset counter if we actually cleaned up something if len(self._handlers) < original_count: self._cleanup_counter = 0 def _validate_handler(self, handler: Callable[P, None]) -> None: """ Validate that a handler is callable and has compatible signature. :param handler: Handler to validate. :raises TypeError: If handler is invalid or incompatible. """ if not callable(handler): raise TypeError(f"Handler must be callable, got {type(handler)}") if self._sig: try: # Handle bound methods - drop their first parameter before comparing if hasattr(handler, "__self__") and handler.__self__ is not None: full_sig = inspect.signature(handler.__func__) params = list(full_sig.parameters.values())[1:] # drop 'self' handler_sig = full_sig.replace(parameters=params) else: handler_sig = inspect.signature(handler) if not Event._is_signature_compatible(handler_sig, self._sig): raise TypeError( f"Handler signature {handler_sig} is not compatible with " f"expected {self._sig}" ) except (ValueError, TypeError) as e: raise TypeError(f"Cannot inspect handler signature: {e}")
[docs] def subscribe_one(self, handler: Callable[P, None]) -> None: """ Subscribe a single handler to this Event with validation and duplicate control. Performs runtime signature checking. :param handler: Callable to subscribe. :raises TypeError: If handler is invalid or incompatible. """ self._validate_handler(handler) h = handler if isinstance(handler, EventHandler) else EventHandler(handler) with self._lock: if self._allow_duplicates or h not in self._handlers: self._handlers.append(h)
[docs] def unsubscribe_one(self, handler: Callable[P, None]) -> None: """ Unsubscribe a single handler from this Event. :param handler: Callable previously subscribed. :raises TypeError: If handler is not callable. """ if not callable(handler): raise TypeError(f"Handler must be callable, got {type(handler)}") h = handler if isinstance(handler, EventHandler) else EventHandler(handler) with self._lock: try: self._handlers.remove(h) except ValueError: pass # Handler not found, ignore silently
def __iadd__(self, handler: Union[Callable[P, None], Iterable[Callable[P, None]]]) -> "Event": """ Subscribe one or more handlers to this Event with validation and duplicate control. Supports single callables or iterables (list, tuple, set). Performs runtime signature checking. :param handler: Callable or iterable of callables to subscribe. :return: Self for chaining. :raises TypeError: If handler is invalid or incompatible. """ if isinstance(handler, (list, tuple, set)): for h in handler: self.subscribe_one(h) else: self.subscribe_one(handler) return self def __isub__(self, handler: Union[Callable[P, None], Iterable[Callable[P, None]]]) -> "Event": """ Unsubscribe one or more handlers from this Event. Supports single callables or iterables (list, tuple, set). If duplicates exist, only the first matching handler is removed. :param handler: Callable or iterable of callables to unsubscribe. :return: Self for chaining. :raises TypeError: If handler is not callable. """ if isinstance(handler, (list, tuple, set)): for h in handler: self.unsubscribe_one(h) else: self.unsubscribe_one(handler) return self
[docs] def emit(self, *args: P.args, **kwargs: P.kwargs) -> None: """ Emit the event synchronously, invoking all subscribed handlers. Error handling isolates exceptions: one handler's exception does not prevent others from running. Dead bound-method handlers are automatically removed during cleanup. Async handlers that return coroutines are silently ignored in sync emission. :param args: Positional arguments matching signature P. :param kwargs: Keyword arguments matching signature P. """ with self._lock: # Optimized cleanup and snapshot for iteration self._cleanup_dead_handlers() handlers_snapshot = list(self._handlers) # Invoke each handler with consistent error handling for h in handlers_snapshot: try: result = h(*args, **kwargs) # If handler returned a coroutine, we can't await it in sync context # Just ignore it - this is expected behavior for mixed sync/async events if inspect.iscoroutine(result): result.close() # Properly close the coroutine to avoid warnings except Exception as e: callback = h.get_callback() if callback is not None: # Only report errors for live handlers try: self._error_handler(e, callback) except Exception: # Prevent error handler exceptions from propagating pass
[docs] async def emit_async(self, *args: P.args, **kwargs: P.kwargs) -> None: """ Asynchronously emit the event, invoking all subscribed handlers concurrently. Handlers that are coroutine functions will be awaited; regular callables will be scheduled on the default executor. All exceptions are consistently routed to the configured error_handler. :param args: Positional arguments matching signature P. :param kwargs: Keyword arguments matching signature P. """ with self._lock: self._cleanup_dead_handlers() handlers_snapshot = list(self._handlers) if not handlers_snapshot: return loop = asyncio.get_running_loop() tasks: List[Future] = [] for h in handlers_snapshot: callback = h.get_callback() if callback is None: continue if inspect.iscoroutinefunction(callback): # Handle async functions (including bound async methods) coroutine = callback(*args, **kwargs) tasks.append( loop.create_task(self._wrap_async_handler(coroutine, callback)) ) elif hasattr(callback, "__func__") and inspect.iscoroutinefunction( callback.__func__ ): # Handle bound methods of async functions coroutine = callback(*args, **kwargs) tasks.append( loop.create_task(self._wrap_async_handler(coroutine, callback)) ) else: # Handle sync functions in thread pool tasks.append( loop.run_in_executor( None, self._wrap_sync_handler, callback, args, kwargs ) ) if tasks: # Wait for all tasks to complete with consistent error handling await asyncio.gather(*tasks, return_exceptions=True)
async def _wrap_async_handler( self, coro: Coroutine, handler: Callable[P, None] ) -> None: """ Await a coroutine handler and route exceptions consistently. :param coro: Coroutine to await. :param handler: Original handler function for error reporting. """ try: await coro except Exception as e: try: self._error_handler(e, handler) except Exception: # Prevent error handler exceptions from propagating pass def _wrap_sync_handler( self, handler: Callable[P, None], args: tuple[Any, ...], kwargs: dict[str, Any] ) -> None: """ Execute a synchronous handler and route exceptions consistently. :param handler: Callable to run. :param args: Tuple of positional arguments for the handler. :param kwargs: Dictionary of keyword arguments for the handler. """ try: handler(*args, **kwargs) except Exception as e: try: self._error_handler(e, handler) except Exception: # Prevent error handler exceptions from propagating pass
[docs] def clear(self) -> None: """ Remove all subscribed handlers from this Event. This is useful for cleanup or resetting event state. """ with self._lock: self._handlers.clear() self._cleanup_counter = 0
[docs] def handler_count(self) -> int: """ Number of currently alive handlers subscribed. This property triggers cleanup of dead handlers for accurate counting. :return: Count of active handlers. """ with self._lock: self._cleanup_dead_handlers(force=True) return len(self._handlers)
@property def handlers(self) -> List[Callable[P, None]]: """ Return a list of currently alive handler callables. This property triggers cleanup and reconstructs callable references. :return: List of active handler functions or bound methods. """ with self._lock: self._cleanup_dead_handlers(force=True) return [ h.get_callback() for h in self._handlers if h.get_callback() is not None ] def __bool__(self) -> bool: """ Return True if there are any alive handlers. :return: True if event has active handlers. """ return self.handler_count() > 0 def __len__(self) -> int: """ Return number of alive handlers. :return: Count of active handlers. """ return self.handler_count()
[docs] @classmethod def new( cls, example: Callable[P, None], *, allow_duplicates: bool = True, error_handler: Optional[ErrorHandler] = None, ) -> Tuple["Event", "Event.Listener"]: """ Factory method to create an Event with runtime signature checking. :param example: Example function whose signature defines allowed handler signature. :param allow_duplicates: If True, same handler can be subscribed multiple times. :param error_handler: Custom error handler for exceptions during emission. :return: Tuple of (Event instance, Listener interface). :raises TypeError: If example is not callable. """ if not callable(example): raise TypeError(f"Example must be callable, got {type(example)}") e = cls(allow_duplicates=allow_duplicates, error_handler=error_handler) e._sig = inspect.signature(example) return e, e.listener
# Factory alias for brevity (signature-checked events) create_event = Event.new