Event System

Priority-based event system with subscriber management, groups, and async execution.

Pub-SubPriorityAsyncGroupsSubscribers

Quick Start

Basic Event Handling

dart
// Register an event listener
Khadem.eventBus.on('user.created', (user) async {
  print('New user created: ${user['name']}');
  await sendWelcomeEmail(user);
});

// Emit an event with payload
await Khadem.eventBus.emit('user.created', {
  'id': 123,
  'name': 'John Doe',
  'email': 'john@example.com'
});

💡 Note: Events are automatically available via Khadem.eventBus

⚡ Tip: Use descriptive event names like 'user.created' or 'order.placed'

Event Priorities

dart
// Register listeners with different priorities
Khadem.eventBus.on('order.processed', (order) async {
  // Critical: Security logging
  await logSecurityEvent('Order processed', order);
}, priority: EventPriority.critical);

Khadem.eventBus.on('order.processed', (order) async {
  // High: Business logic
  await updateInventory(order);
}, priority: EventPriority.high);

Khadem.eventBus.on('order.processed', (order) async {
  // Normal: Notifications
  await sendOrderConfirmation(order);
}, priority: EventPriority.normal);

Khadem.eventBus.on('order.processed', (order) async {
  // Low: Analytics
  await trackAnalytics('order_processed', order);
}, priority: EventPriority.low);

// Emit event - listeners execute in priority order
await Khadem.eventBus.emit('order.processed', orderData);

Priority Levels

  • EventPriority.low - Background tasks, cleanup
  • EventPriority.normal - Standard event handling
  • EventPriority.high - Important business logic
  • EventPriority.critical - Security, logging, monitoring

One-Time Listeners

dart
// Register one-time listeners
Khadem.eventBus.once('app.initialized', () async {
  print('App initialized for the first time!');
  await performInitialSetup();
});

Khadem.eventBus.once('database.migrated', (version) async {
  print('Database migrated to version: $version');
  await updateMigrationStatus(version);
});

// Or use the once() convenience method
Khadem.eventBus.once('cache.warmed', () async {
  print('Cache warmed up successfully');
});

// These listeners will be automatically removed after first execution
await Khadem.eventBus.emit('app.initialized');
await Khadem.eventBus.emit('database.migrated', '1.2.0');

Use Cases

  • • Application initialization
  • • Resource cleanup after first use
  • • Setup tasks that should run only once
  • • Migration or upgrade handlers

Event Groups

dart
// Create event groups for related operations
Khadem.eventBus.addToGroup('user.lifecycle', 'user.created');
Khadem.eventBus.addToGroup('user.lifecycle', 'user.updated');
Khadem.eventBus.addToGroup('user.lifecycle', 'user.deleted');

Khadem.eventBus.addToGroup('order.workflow', 'order.placed');
Khadem.eventBus.addToGroup('order.workflow', 'order.confirmed');
Khadem.eventBus.addToGroup('order.workflow', 'order.shipped');

// Register listeners for group events
Khadem.eventBus.on('user.created', (user) async => await logUserActivity(user, 'created'));
Khadem.eventBus.on('user.updated', (user) async => await logUserActivity(user, 'updated'));
Khadem.eventBus.on('user.deleted', (user) async => await logUserActivity(user, 'deleted'));

// Emit entire group at once
await Khadem.eventBus.emitGroup('user.lifecycle', userData);

// Remove events from groups
Khadem.eventBus.removeFromGroup('user.lifecycle', 'user.updated');

Group Benefits

  • • Batch emit related events together
  • • Organize events by domain or feature
  • • Simplify event management
  • • Enable/disable entire feature sets

Async Event Execution

dart
// Synchronous execution (default)
Khadem.eventBus.on('file.uploaded', (file) async {
  // This blocks other listeners
  await processFile(file);
  await generateThumbnail(file);
});

// Asynchronous execution (recommended for I/O)
await Khadem.eventBus.emit('file.uploaded', fileData, queue: true);

// Multiple async operations
Khadem.eventBus.on('user.registered', (user) async {
  // These run in parallel when queue: true
  final emailTask = sendWelcomeEmail(user);
  final profileTask = createUserProfile(user);
  final analyticsTask = trackRegistration(user);

  await Future.wait([emailTask, profileTask, analyticsTask]);
});

// Emit with async processing
await Khadem.eventBus.emit('user.registered', userData, queue: true);

When to Use Async

  • • I/O operations (database, network calls)
  • • Heavy computations
  • • Multiple independent operations
  • • Non-blocking event processing

Event Subscribers

dart
// Create an event subscriber class
class UserEventSubscriber implements EventSubscriberInterface {
  @override
  List<EventMethod> getEventHandlers() => [
    EventMethod(
      eventName: 'user.created',
      handler: _onUserCreated,
      priority: EventPriority.high,
    ),
    EventMethod(
      eventName: 'user.updated',
      handler: _onUserUpdated,
      priority: EventPriority.normal,
    ),
    EventMethod(
      eventName: 'user.deleted',
      handler: _onUserDeleted,
      priority: EventPriority.normal,
      once: true, // Only handle first deletion
    ),
  ];

  Future<void> _onUserCreated(dynamic user) async {
    await sendWelcomeEmail(user);
    await createUserProfile(user);
  }

  Future<void> _onUserUpdated(dynamic user) async {
    await updateUserCache(user);
    await logUserChange(user);
  }

  Future<void> _onUserDeleted(dynamic user) async {
    await cleanupUserData(user);
    await logUserDeletion(user);
  }
}

// Register the subscriber
final subscriber = UserEventSubscriber();
registerSubscribers([subscriber]);

// All cleanup happens automatically when subscriber is destroyed
Khadem.eventBus.offSubscriber(subscriber);

Subscriber Benefits

  • • Group related event handlers in one class
  • • Automatic cleanup when subscriber is destroyed
  • • Better code organization
  • • Easier testing and debugging

Event Broadcasting

dart
// Broadcast events to all connected clients
Khadem.eventBus.on('notification.sent', (notification) async {
  // Local processing
  await saveNotification(notification);
});

// Emit and broadcast to WebSocket clients
await Khadem.eventBus.emit('notification.sent', notificationData, broadcast: true);

// Real-time features
Khadem.eventBus.on('live.update', (update) async {
  // Process live update
  await applyLiveUpdate(update);
});

// Broadcast live updates to all clients
await Khadem.eventBus.emit('live.update', updateData, broadcast: true);

// Group broadcasting
await Khadem.eventBus.emitGroup('user.updates', userData, broadcast: true);

Broadcast Use Cases

  • • Real-time notifications
  • • Live updates across clients
  • • Distributed system coordination
  • • Multi-instance synchronization

Event Management

dart
// Remove specific listener
void removeListener() {
  Khadem.eventBus.off('user.created', myHandler);
}

// Remove all listeners for an event
void clearEventListeners() {
  Khadem.eventBus.offEvent('user.created');
}

// Remove all listeners for a subscriber
void cleanupSubscriber() {
  Khadem.eventBus.offSubscriber(this);
}

// Clear everything (use with caution)
void resetEventSystem() {
  Khadem.eventBus.clear();
}

// Check if event has listeners
if (Khadem.eventBus.hasListeners('user.created')) {
  print('Event has active listeners');
}

// Get listener count
final count = Khadem.eventBus.listenerCount('user.created');
print('Listeners for user.created: $count');
dart
// Inspect registered listeners
final listeners = Khadem.eventBus.listeners;
for (final entry in listeners.entries) {
  print('Event: ${entry.key}');
  print('Listeners: ${entry.value.length}');
}

// Inspect event groups
final groups = Khadem.eventBus.eventGroups;
for (final entry in groups.entries) {
  print('Group: ${entry.key}');
  print('Events: ${entry.value.join(', ')}');
}

// Inspect subscriber events
final subscriberEvents = Khadem.eventBus.subscriberEvents;
for (final entry in subscriberEvents.entries) {
  print('Subscriber: ${entry.key}');
  print('Events: ${entry.value.join(', ')}');
}

// Debug event system state
void debugEventSystem() {
  print('=== Event System Debug ===');
  print('Total events: ${listeners.length}');
  print('Total groups: ${groups.length}');
  print('Total subscribers: ${subscriberEvents.length}');

  for (final entry in listeners.entries) {
    print('${entry.key}: ${entry.value.length} listeners');
  }
}

Management Operations

  • off(event, listener) - Remove specific listener
  • offEvent(event) - Remove all listeners for event
  • offSubscriber(subscriber) - Remove subscriber's events
  • clear() - Remove all listeners and groups

Error Handling

dart
// Proper error handling in listeners
Khadem.eventBus.on('user.created', (user) async {
  try {
    await sendWelcomeEmail(user);
    await createUserProfile(user);
    await logUserCreation(user);
  } catch (e, stackTrace) {
    // Log error but don't rethrow
    Khadem.logger.error('Failed to process user creation', error: e, stackTrace: stackTrace);

    // Optionally emit error event
    await Khadem.eventBus.emit('user.creation.failed', {
      'user': user,
      'error': e.toString(),
      'timestamp': DateTime.now(),
    });
  }
});

// Error event handler
Khadem.eventBus.on('user.creation.failed', (errorData) async {
  // Handle failed user creation
  await notifyAdmin(errorData);
  await cleanupPartialData(errorData['user']);
});

// Async error handling
Khadem.eventBus.on('file.processed', (file) async {
  try {
    final result = await processFileAsync(file);
    await Khadem.eventBus.emit('file.processing.success', result);
  } on FileProcessingException catch (e) {
    await Khadem.eventBus.emit('file.processing.error', {
      'file': file,
      'error': e.message,
    });
  } catch (e) {
    await Khadem.eventBus.emit('file.processing.unexpected_error', {
      'file': file,
      'error': e.toString(),
    });
  }
});

Error Handling Best Practices

  • • Always wrap listener logic in try-catch blocks
  • • Log errors but don't rethrow to avoid disrupting other listeners
  • • Use async error handling for complex operations
  • • Consider using error events for centralized error handling

Best Practices

✅ Do's

  • • Use descriptive, namespaced event names
  • • Handle errors gracefully in listeners
  • • Use appropriate priorities for critical operations
  • • Clean up listeners when components are destroyed
  • • Use event groups for related operations
  • • Document custom events and their payloads
  • • Use subscribers for complex event handling
  • • Test event interactions thoroughly

❌ Don'ts

  • • Don't perform heavy operations in sync listeners
  • • Don't rely on event execution order (except priorities)
  • • Don't forget to clean up listeners in long-lived objects
  • • Don't use events for simple method calls
  • • Don't emit events in event listeners (can cause loops)
  • • Don't ignore errors in event listeners
  • • Don't use anonymous functions if you need to remove them
  • • Don't block the event loop with sync operations

Common Patterns

Domain Event Pattern

Organize events by business domain

dart
// Domain-driven event organization
class UserDomainEvents {
  static const String created = 'user.domain.created';
  static const String updated = 'user.domain.updated';
  static const String deleted = 'user.domain.deleted';
  static const String passwordChanged = 'user.domain.password_changed';
  static const String emailVerified = 'user.domain.email_verified';
}

class OrderDomainEvents {
  static const String placed = 'order.domain.placed';
  static const String confirmed = 'order.domain.confirmed';
  static const String shipped = 'order.domain.shipped';
  static const String delivered = 'order.domain.delivered';
  static const String cancelled = 'order.domain.cancelled';
}

// Usage with domain events
Khadem.eventBus.on(UserDomainEvents.created, (user) async {
  await sendWelcomeEmail(user);
  await createUserProfile(user);
});

Khadem.eventBus.on(OrderDomainEvents.placed, (order) async {
  await reserveInventory(order);
  await processPayment(order);
});

// Group domain events
Khadem.eventBus.addToGroup('user.domain', UserDomainEvents.created);
Khadem.eventBus.addToGroup('user.domain', UserDomainEvents.updated);
Khadem.eventBus.addToGroup('user.domain', UserDomainEvents.deleted);

Khadem.eventBus.addToGroup('order.lifecycle', OrderDomainEvents.placed);
Khadem.eventBus.addToGroup('order.lifecycle', OrderDomainEvents.confirmed);
Khadem.eventBus.addToGroup('order.lifecycle', OrderDomainEvents.shipped);

Observer Pattern

Decouple components with events

dart
// Observer pattern with events
class UserService {
  Future<void> createUser(Map<String, dynamic> userData) async {
    final user = await _saveUserToDatabase(userData);
    await Khadem.eventBus.emit('user.created', user);
  }

  Future<void> updateUser(int userId, Map<String, dynamic> updates) async {
    final user = await _updateUserInDatabase(userId, updates);
    await Khadem.eventBus.emit('user.updated', user);
  }
}

class EmailService {
  EmailService() {
    // Subscribe to user events
    Khadem.eventBus.on('user.created', _sendWelcomeEmail);
    Khadem.eventBus.on('user.updated', _sendUpdateNotification);
  }

  Future<void> _sendWelcomeEmail(dynamic user) async {
    await sendEmail(user['email'], 'Welcome!', 'Welcome to our platform');
  }

  Future<void> _sendUpdateNotification(dynamic user) async {
    await sendEmail(user['email'], 'Profile Updated', 'Your profile has been updated');
  }
}

class AnalyticsService {
  AnalyticsService() {
    Khadem.eventBus.on('user.created', _trackRegistration);
    Khadem.eventBus.on('user.updated', _trackProfileUpdate);
  }

  Future<void> _trackRegistration(dynamic user) async {
    await trackEvent('user_registration', {'user_id': user['id']});
  }

  Future<void> _trackProfileUpdate(dynamic user) async {
    await trackEvent('profile_update', {'user_id': user['id']});
  }
}

// Usage
final userService = UserService();
final emailService = EmailService(); // Automatically subscribes
final analyticsService = AnalyticsService(); // Automatically subscribes

await userService.createUser(userData); // Triggers both services

Event Middleware

Add cross-cutting concerns

dart
// Event middleware for cross-cutting concerns
class EventMiddleware {
  static Future<void> loggingMiddleware(String event, dynamic payload) async {
    Khadem.logger.info('Event emitted: $event', extra: {'payload': payload});
    await Khadem.eventBus.emit(event, payload);
    Khadem.logger.info('Event completed: $event');
  }

  static Future<void> errorHandlingMiddleware(String event, dynamic payload) async {
    try {
      await Khadem.eventBus.emit(event, payload);
    } catch (e, stackTrace) {
      Khadem.logger.error('Event failed: $event', error: e, stackTrace: stackTrace);
      await Khadem.eventBus.emit('event.error', {
        'event': event,
        'payload': payload,
        'error': e.toString(),
        'timestamp': DateTime.now(),
      });
    }
  }

  static Future<void> performanceMiddleware(String event, dynamic payload) async {
    final start = DateTime.now();
    await Khadem.eventBus.emit(event, payload);
    final duration = DateTime.now().difference(start);

    if (duration > Duration(milliseconds: 100)) {
      Khadem.logger.warn('Slow event: $event took ${duration.inMilliseconds}ms');
    }
  }
}

// Usage with middleware
Future<void> emitWithMiddleware(String event, dynamic payload) async {
  await EventMiddleware.loggingMiddleware(event, payload);
  await EventMiddleware.errorHandlingMiddleware(event, payload);
  await EventMiddleware.performanceMiddleware(event, payload);
}

// Or create a middleware pipeline
class EventPipeline {
  final List<Future<void> Function(String, dynamic)> _middlewares = [];

  void addMiddleware(Future<void> Function(String, dynamic) middleware) {
    _middlewares.add(middleware);
  }

  Future<void> emit(String event, dynamic payload) async {
    for (final middleware in _middlewares) {
      await middleware(event, payload);
    }
  }
}

Event Sourcing

Track state changes through events

dart
// Event sourcing pattern
class EventStore {
  final List<Map<String, dynamic>> _events = [];

  Future<void> saveEvent(String eventType, dynamic payload) async {
    final event = {
      'id': Uuid().v4(),
      'type': eventType,
      'payload': payload,
      'timestamp': DateTime.now(),
      'version': _events.length + 1,
    };

    _events.add(event);
    await Khadem.eventBus.emit('event.stored', event);
  }

  List<Map<String, dynamic>> getEventsForAggregate(String aggregateId) {
    return _events.where((event) =>
      event['payload']['aggregateId'] == aggregateId
    ).toList();
  }

  Future<void> replayEvents() async {
    for (final event in _events) {
      await Khadem.eventBus.emit(event['type'], event['payload']);
    }
  }
}

// Event-sourced aggregate
class UserAggregate {
  final String id;
  String name;
  String email;
  bool emailVerified = false;

  UserAggregate(this.id, this.name, this.email);

  Future<void> updateName(String newName) async {
    await Khadem.eventBus.emit('user.name_updated', {
      'aggregateId': id,
      'oldName': name,
      'newName': newName,
      'timestamp': DateTime.now(),
    });
    name = newName;
  }

  Future<void> verifyEmail() async {
    await Khadem.eventBus.emit('user.email_verified', {
      'aggregateId': id,
      'email': email,
      'timestamp': DateTime.now(),
    });
    emailVerified = true;
  }
}

// Event handlers for state reconstruction
class UserEventHandlers {
  final Map<String, UserAggregate> _users = {};

  UserEventHandlers() {
    Khadem.eventBus.on('user.name_updated', _onNameUpdated);
    Khadem.eventBus.on('user.email_verified', _onEmailVerified);
  }

  Future<void> _onNameUpdated(dynamic event) async {
    final user = _users[event['aggregateId']];
    if (user != null) {
      user.name = event['newName'];
    }
  }

  Future<void> _onEmailVerified(dynamic event) async {
    final user = _users[event['aggregateId']];
    if (user != null) {
      user.emailVerified = true;
    }
  }

  UserAggregate? getUser(String id) => _users[id];
}

Performance Considerations

Optimization Tips

  • • Use async execution for I/O operations
  • • Limit the number of listeners per event
  • • Use event groups for batch operations
  • • Clean up unused listeners regularly
  • • Consider memory usage with long-lived listeners
  • • Use priorities to optimize execution order

Memory Management

  • • Always remove listeners when components are destroyed
  • • Use subscribers for automatic cleanup
  • • Avoid circular references in event payloads
  • • Monitor listener count for memory leaks
  • • Use weak references for long-lived objects

On this page