Event System
Priority-based event system with subscriber management, groups, and async execution.
Pub-SubPriorityAsyncGroupsSubscribers
Quick Start
Basic Event Handling
dart
// Register an event listener
Khadem.eventBus.on('user.created', (user) async {
print('New user created: ${user['name']}');
await sendWelcomeEmail(user);
});
// Emit an event with payload
await Khadem.eventBus.emit('user.created', {
'id': 123,
'name': 'John Doe',
'email': 'john@example.com'
});
💡 Note: Events are automatically available via Khadem.eventBus
⚡ Tip: Use descriptive event names like 'user.created'
or 'order.placed'
Event Priorities
dart
// Register listeners with different priorities
Khadem.eventBus.on('order.processed', (order) async {
// Critical: Security logging
await logSecurityEvent('Order processed', order);
}, priority: EventPriority.critical);
Khadem.eventBus.on('order.processed', (order) async {
// High: Business logic
await updateInventory(order);
}, priority: EventPriority.high);
Khadem.eventBus.on('order.processed', (order) async {
// Normal: Notifications
await sendOrderConfirmation(order);
}, priority: EventPriority.normal);
Khadem.eventBus.on('order.processed', (order) async {
// Low: Analytics
await trackAnalytics('order_processed', order);
}, priority: EventPriority.low);
// Emit event - listeners execute in priority order
await Khadem.eventBus.emit('order.processed', orderData);
Priority Levels
- •
EventPriority.low
- Background tasks, cleanup - •
EventPriority.normal
- Standard event handling - •
EventPriority.high
- Important business logic - •
EventPriority.critical
- Security, logging, monitoring
One-Time Listeners
dart
// Register one-time listeners
Khadem.eventBus.once('app.initialized', () async {
print('App initialized for the first time!');
await performInitialSetup();
});
Khadem.eventBus.once('database.migrated', (version) async {
print('Database migrated to version: $version');
await updateMigrationStatus(version);
});
// Or use the once() convenience method
Khadem.eventBus.once('cache.warmed', () async {
print('Cache warmed up successfully');
});
// These listeners will be automatically removed after first execution
await Khadem.eventBus.emit('app.initialized');
await Khadem.eventBus.emit('database.migrated', '1.2.0');
Use Cases
- • Application initialization
- • Resource cleanup after first use
- • Setup tasks that should run only once
- • Migration or upgrade handlers
Event Groups
dart
// Create event groups for related operations
Khadem.eventBus.addToGroup('user.lifecycle', 'user.created');
Khadem.eventBus.addToGroup('user.lifecycle', 'user.updated');
Khadem.eventBus.addToGroup('user.lifecycle', 'user.deleted');
Khadem.eventBus.addToGroup('order.workflow', 'order.placed');
Khadem.eventBus.addToGroup('order.workflow', 'order.confirmed');
Khadem.eventBus.addToGroup('order.workflow', 'order.shipped');
// Register listeners for group events
Khadem.eventBus.on('user.created', (user) async => await logUserActivity(user, 'created'));
Khadem.eventBus.on('user.updated', (user) async => await logUserActivity(user, 'updated'));
Khadem.eventBus.on('user.deleted', (user) async => await logUserActivity(user, 'deleted'));
// Emit entire group at once
await Khadem.eventBus.emitGroup('user.lifecycle', userData);
// Remove events from groups
Khadem.eventBus.removeFromGroup('user.lifecycle', 'user.updated');
Group Benefits
- • Batch emit related events together
- • Organize events by domain or feature
- • Simplify event management
- • Enable/disable entire feature sets
Async Event Execution
dart
// Synchronous execution (default)
Khadem.eventBus.on('file.uploaded', (file) async {
// This blocks other listeners
await processFile(file);
await generateThumbnail(file);
});
// Asynchronous execution (recommended for I/O)
await Khadem.eventBus.emit('file.uploaded', fileData, queue: true);
// Multiple async operations
Khadem.eventBus.on('user.registered', (user) async {
// These run in parallel when queue: true
final emailTask = sendWelcomeEmail(user);
final profileTask = createUserProfile(user);
final analyticsTask = trackRegistration(user);
await Future.wait([emailTask, profileTask, analyticsTask]);
});
// Emit with async processing
await Khadem.eventBus.emit('user.registered', userData, queue: true);
When to Use Async
- • I/O operations (database, network calls)
- • Heavy computations
- • Multiple independent operations
- • Non-blocking event processing
Event Subscribers
dart
// Create an event subscriber class
class UserEventSubscriber implements EventSubscriberInterface {
@override
List<EventMethod> getEventHandlers() => [
EventMethod(
eventName: 'user.created',
handler: _onUserCreated,
priority: EventPriority.high,
),
EventMethod(
eventName: 'user.updated',
handler: _onUserUpdated,
priority: EventPriority.normal,
),
EventMethod(
eventName: 'user.deleted',
handler: _onUserDeleted,
priority: EventPriority.normal,
once: true, // Only handle first deletion
),
];
Future<void> _onUserCreated(dynamic user) async {
await sendWelcomeEmail(user);
await createUserProfile(user);
}
Future<void> _onUserUpdated(dynamic user) async {
await updateUserCache(user);
await logUserChange(user);
}
Future<void> _onUserDeleted(dynamic user) async {
await cleanupUserData(user);
await logUserDeletion(user);
}
}
// Register the subscriber
final subscriber = UserEventSubscriber();
registerSubscribers([subscriber]);
// All cleanup happens automatically when subscriber is destroyed
Khadem.eventBus.offSubscriber(subscriber);
Subscriber Benefits
- • Group related event handlers in one class
- • Automatic cleanup when subscriber is destroyed
- • Better code organization
- • Easier testing and debugging
Event Broadcasting
dart
// Broadcast events to all connected clients
Khadem.eventBus.on('notification.sent', (notification) async {
// Local processing
await saveNotification(notification);
});
// Emit and broadcast to WebSocket clients
await Khadem.eventBus.emit('notification.sent', notificationData, broadcast: true);
// Real-time features
Khadem.eventBus.on('live.update', (update) async {
// Process live update
await applyLiveUpdate(update);
});
// Broadcast live updates to all clients
await Khadem.eventBus.emit('live.update', updateData, broadcast: true);
// Group broadcasting
await Khadem.eventBus.emitGroup('user.updates', userData, broadcast: true);
Broadcast Use Cases
- • Real-time notifications
- • Live updates across clients
- • Distributed system coordination
- • Multi-instance synchronization
Event Management
dart
// Remove specific listener
void removeListener() {
Khadem.eventBus.off('user.created', myHandler);
}
// Remove all listeners for an event
void clearEventListeners() {
Khadem.eventBus.offEvent('user.created');
}
// Remove all listeners for a subscriber
void cleanupSubscriber() {
Khadem.eventBus.offSubscriber(this);
}
// Clear everything (use with caution)
void resetEventSystem() {
Khadem.eventBus.clear();
}
// Check if event has listeners
if (Khadem.eventBus.hasListeners('user.created')) {
print('Event has active listeners');
}
// Get listener count
final count = Khadem.eventBus.listenerCount('user.created');
print('Listeners for user.created: $count');
dart
// Inspect registered listeners
final listeners = Khadem.eventBus.listeners;
for (final entry in listeners.entries) {
print('Event: ${entry.key}');
print('Listeners: ${entry.value.length}');
}
// Inspect event groups
final groups = Khadem.eventBus.eventGroups;
for (final entry in groups.entries) {
print('Group: ${entry.key}');
print('Events: ${entry.value.join(', ')}');
}
// Inspect subscriber events
final subscriberEvents = Khadem.eventBus.subscriberEvents;
for (final entry in subscriberEvents.entries) {
print('Subscriber: ${entry.key}');
print('Events: ${entry.value.join(', ')}');
}
// Debug event system state
void debugEventSystem() {
print('=== Event System Debug ===');
print('Total events: ${listeners.length}');
print('Total groups: ${groups.length}');
print('Total subscribers: ${subscriberEvents.length}');
for (final entry in listeners.entries) {
print('${entry.key}: ${entry.value.length} listeners');
}
}
Management Operations
- •
off(event, listener)
- Remove specific listener - •
offEvent(event)
- Remove all listeners for event - •
offSubscriber(subscriber)
- Remove subscriber's events - •
clear()
- Remove all listeners and groups
Error Handling
dart
// Proper error handling in listeners
Khadem.eventBus.on('user.created', (user) async {
try {
await sendWelcomeEmail(user);
await createUserProfile(user);
await logUserCreation(user);
} catch (e, stackTrace) {
// Log error but don't rethrow
Khadem.logger.error('Failed to process user creation', error: e, stackTrace: stackTrace);
// Optionally emit error event
await Khadem.eventBus.emit('user.creation.failed', {
'user': user,
'error': e.toString(),
'timestamp': DateTime.now(),
});
}
});
// Error event handler
Khadem.eventBus.on('user.creation.failed', (errorData) async {
// Handle failed user creation
await notifyAdmin(errorData);
await cleanupPartialData(errorData['user']);
});
// Async error handling
Khadem.eventBus.on('file.processed', (file) async {
try {
final result = await processFileAsync(file);
await Khadem.eventBus.emit('file.processing.success', result);
} on FileProcessingException catch (e) {
await Khadem.eventBus.emit('file.processing.error', {
'file': file,
'error': e.message,
});
} catch (e) {
await Khadem.eventBus.emit('file.processing.unexpected_error', {
'file': file,
'error': e.toString(),
});
}
});
Error Handling Best Practices
- • Always wrap listener logic in try-catch blocks
- • Log errors but don't rethrow to avoid disrupting other listeners
- • Use async error handling for complex operations
- • Consider using error events for centralized error handling
Best Practices
✅ Do's
- • Use descriptive, namespaced event names
- • Handle errors gracefully in listeners
- • Use appropriate priorities for critical operations
- • Clean up listeners when components are destroyed
- • Use event groups for related operations
- • Document custom events and their payloads
- • Use subscribers for complex event handling
- • Test event interactions thoroughly
❌ Don'ts
- • Don't perform heavy operations in sync listeners
- • Don't rely on event execution order (except priorities)
- • Don't forget to clean up listeners in long-lived objects
- • Don't use events for simple method calls
- • Don't emit events in event listeners (can cause loops)
- • Don't ignore errors in event listeners
- • Don't use anonymous functions if you need to remove them
- • Don't block the event loop with sync operations
Common Patterns
Domain Event Pattern
Organize events by business domain
dart
// Domain-driven event organization
class UserDomainEvents {
static const String created = 'user.domain.created';
static const String updated = 'user.domain.updated';
static const String deleted = 'user.domain.deleted';
static const String passwordChanged = 'user.domain.password_changed';
static const String emailVerified = 'user.domain.email_verified';
}
class OrderDomainEvents {
static const String placed = 'order.domain.placed';
static const String confirmed = 'order.domain.confirmed';
static const String shipped = 'order.domain.shipped';
static const String delivered = 'order.domain.delivered';
static const String cancelled = 'order.domain.cancelled';
}
// Usage with domain events
Khadem.eventBus.on(UserDomainEvents.created, (user) async {
await sendWelcomeEmail(user);
await createUserProfile(user);
});
Khadem.eventBus.on(OrderDomainEvents.placed, (order) async {
await reserveInventory(order);
await processPayment(order);
});
// Group domain events
Khadem.eventBus.addToGroup('user.domain', UserDomainEvents.created);
Khadem.eventBus.addToGroup('user.domain', UserDomainEvents.updated);
Khadem.eventBus.addToGroup('user.domain', UserDomainEvents.deleted);
Khadem.eventBus.addToGroup('order.lifecycle', OrderDomainEvents.placed);
Khadem.eventBus.addToGroup('order.lifecycle', OrderDomainEvents.confirmed);
Khadem.eventBus.addToGroup('order.lifecycle', OrderDomainEvents.shipped);
Observer Pattern
Decouple components with events
dart
// Observer pattern with events
class UserService {
Future<void> createUser(Map<String, dynamic> userData) async {
final user = await _saveUserToDatabase(userData);
await Khadem.eventBus.emit('user.created', user);
}
Future<void> updateUser(int userId, Map<String, dynamic> updates) async {
final user = await _updateUserInDatabase(userId, updates);
await Khadem.eventBus.emit('user.updated', user);
}
}
class EmailService {
EmailService() {
// Subscribe to user events
Khadem.eventBus.on('user.created', _sendWelcomeEmail);
Khadem.eventBus.on('user.updated', _sendUpdateNotification);
}
Future<void> _sendWelcomeEmail(dynamic user) async {
await sendEmail(user['email'], 'Welcome!', 'Welcome to our platform');
}
Future<void> _sendUpdateNotification(dynamic user) async {
await sendEmail(user['email'], 'Profile Updated', 'Your profile has been updated');
}
}
class AnalyticsService {
AnalyticsService() {
Khadem.eventBus.on('user.created', _trackRegistration);
Khadem.eventBus.on('user.updated', _trackProfileUpdate);
}
Future<void> _trackRegistration(dynamic user) async {
await trackEvent('user_registration', {'user_id': user['id']});
}
Future<void> _trackProfileUpdate(dynamic user) async {
await trackEvent('profile_update', {'user_id': user['id']});
}
}
// Usage
final userService = UserService();
final emailService = EmailService(); // Automatically subscribes
final analyticsService = AnalyticsService(); // Automatically subscribes
await userService.createUser(userData); // Triggers both services
Event Middleware
Add cross-cutting concerns
dart
// Event middleware for cross-cutting concerns
class EventMiddleware {
static Future<void> loggingMiddleware(String event, dynamic payload) async {
Khadem.logger.info('Event emitted: $event', extra: {'payload': payload});
await Khadem.eventBus.emit(event, payload);
Khadem.logger.info('Event completed: $event');
}
static Future<void> errorHandlingMiddleware(String event, dynamic payload) async {
try {
await Khadem.eventBus.emit(event, payload);
} catch (e, stackTrace) {
Khadem.logger.error('Event failed: $event', error: e, stackTrace: stackTrace);
await Khadem.eventBus.emit('event.error', {
'event': event,
'payload': payload,
'error': e.toString(),
'timestamp': DateTime.now(),
});
}
}
static Future<void> performanceMiddleware(String event, dynamic payload) async {
final start = DateTime.now();
await Khadem.eventBus.emit(event, payload);
final duration = DateTime.now().difference(start);
if (duration > Duration(milliseconds: 100)) {
Khadem.logger.warn('Slow event: $event took ${duration.inMilliseconds}ms');
}
}
}
// Usage with middleware
Future<void> emitWithMiddleware(String event, dynamic payload) async {
await EventMiddleware.loggingMiddleware(event, payload);
await EventMiddleware.errorHandlingMiddleware(event, payload);
await EventMiddleware.performanceMiddleware(event, payload);
}
// Or create a middleware pipeline
class EventPipeline {
final List<Future<void> Function(String, dynamic)> _middlewares = [];
void addMiddleware(Future<void> Function(String, dynamic) middleware) {
_middlewares.add(middleware);
}
Future<void> emit(String event, dynamic payload) async {
for (final middleware in _middlewares) {
await middleware(event, payload);
}
}
}
Event Sourcing
Track state changes through events
dart
// Event sourcing pattern
class EventStore {
final List<Map<String, dynamic>> _events = [];
Future<void> saveEvent(String eventType, dynamic payload) async {
final event = {
'id': Uuid().v4(),
'type': eventType,
'payload': payload,
'timestamp': DateTime.now(),
'version': _events.length + 1,
};
_events.add(event);
await Khadem.eventBus.emit('event.stored', event);
}
List<Map<String, dynamic>> getEventsForAggregate(String aggregateId) {
return _events.where((event) =>
event['payload']['aggregateId'] == aggregateId
).toList();
}
Future<void> replayEvents() async {
for (final event in _events) {
await Khadem.eventBus.emit(event['type'], event['payload']);
}
}
}
// Event-sourced aggregate
class UserAggregate {
final String id;
String name;
String email;
bool emailVerified = false;
UserAggregate(this.id, this.name, this.email);
Future<void> updateName(String newName) async {
await Khadem.eventBus.emit('user.name_updated', {
'aggregateId': id,
'oldName': name,
'newName': newName,
'timestamp': DateTime.now(),
});
name = newName;
}
Future<void> verifyEmail() async {
await Khadem.eventBus.emit('user.email_verified', {
'aggregateId': id,
'email': email,
'timestamp': DateTime.now(),
});
emailVerified = true;
}
}
// Event handlers for state reconstruction
class UserEventHandlers {
final Map<String, UserAggregate> _users = {};
UserEventHandlers() {
Khadem.eventBus.on('user.name_updated', _onNameUpdated);
Khadem.eventBus.on('user.email_verified', _onEmailVerified);
}
Future<void> _onNameUpdated(dynamic event) async {
final user = _users[event['aggregateId']];
if (user != null) {
user.name = event['newName'];
}
}
Future<void> _onEmailVerified(dynamic event) async {
final user = _users[event['aggregateId']];
if (user != null) {
user.emailVerified = true;
}
}
UserAggregate? getUser(String id) => _users[id];
}
Performance Considerations
Optimization Tips
- • Use async execution for I/O operations
- • Limit the number of listeners per event
- • Use event groups for batch operations
- • Clean up unused listeners regularly
- • Consider memory usage with long-lived listeners
- • Use priorities to optimize execution order
Memory Management
- • Always remove listeners when components are destroyed
- • Use subscribers for automatic cleanup
- • Avoid circular references in event payloads
- • Monitor listener count for memory leaks
- • Use weak references for long-lived objects