Usage Guide
This guide demonstrates how to leverage PyESys for effective event-driven programming:
Create and emit events with type safety
Subscribe synchronous and asynchronous handlers
Use @event decorators for clean API design
Manage handler lifecycles efficiently
Chain events for complex workflows
Handle errors gracefully in production
Installation
PyESys requires Python 3.12+ and has zero external dependencies.
pip install pyesys
For local development with testing tools:
git clone https://github.com/fisothemes/pyesys.git
cd pyesys
pip install -e .[dev]
Quick Start: Creating Events
The create_event() function creates an event emitter and listener pair. Use an example function to define the expected handler signature:
from pyesys import create_event
# Create event with signature validation
event, listener = create_event(example=lambda msg: None)
def log_message(msg: str):
print(f"[LOG] {msg}")
def store_message(msg: str):
# Store to database, file, etc.
pass
# Subscribe handlers
listener += log_message
listener += store_message
# Emit to all subscribers
event.emit("System started successfully")
The example function enforces that all handlers must accept exactly one parameter. This catches signature mismatches at subscription time, preventing runtime errors.
Decorator Pattern: Module-Level Events
For application-wide events, use the @event decorator at module level to create clean, reusable event APIs:
from pyesys import event
@event
def on_user_login(user_id: str, timestamp: float) -> None:
"""Fired when a user successfully logs in"""
@on_user_login.emitter
def login_user(user_id: str, timestamp: float) -> None:
"""Authenticate user and emit login event"""
# Perform authentication logic here
print(f"User {user_id} authenticated at {timestamp}")
# Event automatically emitted after function completes
# Subscribe to login events
def update_last_seen(user_id: str, timestamp: float):
print(f"Updating last seen for {user_id}")
def log_access(user_id: str, timestamp: float):
print(f"Access logged: {user_id} at {timestamp}")
on_user_login += [update_last_seen, log_access]
# Trigger authentication and event
import time
login_user("alice", time.time())
The @event decorator creates a module-level event, while @on_user_login.emitter creates a function that automatically emits the event after executing its body.
Decorator Pattern: Per-Instance Events
Class-level events provide per-instance isolation, perfect for component-based architectures:
from pyesys import event
class FileProcessor:
@event
def on_progress(self, filename: str, percent: float) -> None:
"""Progress update event"""
@event
def on_complete(self, filename: str, result: dict) -> None:
"""Processing complete event"""
@on_progress.emitter
def _update_progress(self, filename: str, percent: float):
"""Internal progress updater"""
pass # Event emitted automatically
@on_complete.emitter
def _finish_processing(self, filename: str, result: dict):
"""Internal completion handler"""
pass # Event emitted automatically
def process_file(self, filename: str):
print(f"Starting processing: {filename}")
# Simulate processing with progress updates
for i in range(0, 101, 25):
self._update_progress(filename, i)
result = {"status": "success", "lines": 1000}
self._finish_processing(filename, result)
# Each processor instance has independent events
processor1 = FileProcessor()
processor2 = FileProcessor()
# Subscribe different handlers to each instance
processor1.on_progress += lambda f, p: print(f"P1: {f} at {p}%")
processor2.on_progress += lambda f, p: print(f"P2: {f} at {p}%")
processor1.process_file("data1.txt")
processor2.process_file("data2.txt")
Each FileProcessor instance maintains its own event handlers, preventing cross-instance interference.
Efficient Handler Management
PyESys supports bulk operations for managing multiple handlers efficiently:
from pyesys import create_event
event, listener = create_event(example=lambda data: None)
def handler_a(data): print("A:", data)
def handler_b(data): print("B:", data)
def handler_c(data): print("C:", data)
def handler_d(data): print("D:", data)
# Bulk subscribe using collections
listener += [handler_a, handler_b, handler_c]
listener += {handler_d} # Sets work too
print(f"Active handlers: {listener.handler_count()}") # 4
# Bulk unsubscribe
listener -= [handler_a, handler_c]
print(f"Remaining handlers: {listener.handler_count()}") # 2
# Clear all handlers
event.clear()
print(f"After clear: {listener.handler_count()}") # 0
This pattern is especially useful for plugin systems or dynamic handler registration scenarios.
Event Chaining and Workflows
Chain events between objects to create flexible processing pipelines:
from pyesys import event
from abc import ABC
import json
class DataProcessor(ABC):
@event
def on_processed(self, data: dict, metadata: dict) -> None:
"""Emitted when processing completes"""
@on_processed.emitter
def _emit_processed(self, data: dict, metadata: dict):
"""Internal method to emit processing event"""
pass
class JsonParser(DataProcessor):
def process(self, raw_data: str):
print("Parsing JSON...")
try:
data = json.loads(raw_data)
metadata = {"parser": "json", "status": "success"}
except json.JSONDecodeError:
data = {}
metadata = {"parser": "json", "status": "error"}
self._emit_processed(data, metadata)
class DataValidator(DataProcessor):
def validate(self, data: dict, metadata: dict):
print(f"Validating data (previous: {metadata['status']})...")
if metadata["status"] == "error":
metadata["validator"] = "skipped"
else:
# Perform validation
is_valid = "name" in data and "id" in data
metadata["validator"] = "passed" if is_valid else "failed"
self._emit_processed(data, metadata)
class DataStore(DataProcessor):
def store(self, data: dict, metadata: dict):
print(f"Storing data (validation: {metadata.get('validator', 'none')})...")
if metadata.get("validator") == "passed":
print(f"✓ Stored: {data}")
metadata["storage"] = "success"
else:
print("✗ Storage skipped due to validation failure")
metadata["storage"] = "skipped"
self._emit_processed(data, metadata)
# Create pipeline
parser = JsonParser()
validator = DataValidator()
store = DataStore()
# Chain the processors
parser.on_processed += validator.validate
validator.on_processed += store.store
# Final result handler
def log_final_result(data: dict, metadata: dict):
print(f"Pipeline complete: {metadata}")
store.on_processed += log_final_result
# Process data through the pipeline
parser.process('{"name": "Alice", "id": 123}')
print("---")
parser.process('{"invalid": json}')
This pattern enables flexible, testable processing chains where each component can be developed and tested independently.
Asynchronous Event Handling
PyESys seamlessly handles mixed sync/async handlers, running them concurrently when possible:
import asyncio
import time
from pyesys import create_event
event, listener = create_event(example=lambda data: None)
def sync_handler(data):
"""Synchronous handler - runs in thread pool"""
print(f"Sync processing: {data}")
time.sleep(0.1) # Simulate work
print(f"Sync complete: {data}")
async def async_handler(data):
"""Asynchronous handler - runs in event loop"""
print(f"Async processing: {data}")
await asyncio.sleep(0.1) # Simulate async work
print(f"Async complete: {data}")
async def slow_async_handler(data):
"""Another async handler with different timing"""
await asyncio.sleep(0.2)
print(f"Slow async complete: {data}")
# Subscribe mixed handler types
listener += [sync_handler, async_handler, slow_async_handler]
async def main():
print("Emitting to mixed handlers...")
await event.emit_async("test-data")
print("All handlers completed")
# Run the async event
asyncio.run(main())
The emit_async() method ensures all handlers complete before returning, with sync handlers running in a thread pool to avoid blocking the event loop.
Production Error Handling
Implement robust error handling to prevent one failing handler from affecting others:
from pyesys import create_event
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def production_error_handler(exception: Exception, handler_func):
"""Custom error handler for production use"""
logger.error(
f"Event handler {handler_func.__name__} failed: {exception}",
exc_info=True
)
# Could also:
# - Send to error tracking service
# - Increment failure metrics
# - Disable repeatedly failing handlers
# Create event with custom error handling
event, listener = create_event(
example=lambda x: None,
error_handler=production_error_handler
)
def reliable_handler(x):
print(f"Reliable handler: {x}")
def unreliable_handler(x):
if x == "trigger_error":
raise ValueError("Something went wrong!")
print(f"Unreliable handler: {x}")
def another_handler(x):
print(f"Another handler: {x}")
listener += [reliable_handler, unreliable_handler, another_handler]
# Test error handling
event.emit("normal_data") # All handlers run
print("---")
event.emit("trigger_error") # unreliable_handler fails, others continue
Custom error handlers allow you to integrate with your monitoring and logging infrastructure while ensuring system resilience.
Debugging and Introspection
PyESys provides tools for debugging and monitoring event handler state:
from pyesys import create_event
event, listener = create_event(example=lambda x: None)
def handler_one(x): pass
def handler_two(x): pass
listener += [handler_one, handler_two]
# Inspect current handlers
print(f"Handler count: {listener.handler_count()}")
print("\nActive handlers:")
for i, handler in enumerate(event.handlers):
print(f" {i}: {handler}")
# Check if specific handler is subscribed
is_subscribed = any(h == handler_one for h in event.handlers)
print(f"\nhandler_one subscribed: {is_subscribed}")
# Remove specific handler
listener -= handler_one
print(f"After removal: {listener.handler_count()}")
These introspection capabilities are valuable for debugging complex event-driven systems and ensuring proper handler lifecycle management.
Best Practices
Type Safety: Always use descriptive example functions that match your handler signatures:
# Good: Clear signature
event, listener = create_event(
example=lambda user_id: str, action: str, timestamp: float: None
)
# Avoid: Vague signatures
event, listener = create_event(example=lambda *args: None)
Memory Management: PyESys uses weak references automatically, but be mindful of handler lifecycles:
class EventHandler:
def handle_event(self, data):
print(f"Handling: {data}")
# Handler will be garbage collected when 'handler' goes out of scope
handler = EventHandler()
listener += handler.handle_event
# Keep reference if handler needs to persist
self.persistent_handler = EventHandler()
listener += self.persistent_handler.handle_event
Error Resilience: Always implement custom error handlers in production systems to prevent cascading failures.
More Examples
Find complete working examples and advanced patterns in the GitHub repository: