WebSocket System

A powerful real-time communication system for building interactive applications with WebSocket support, middleware pipeline, and room-based messaging.

Real-time CommunicationRoom-based MessagingMiddleware PipelineEvent-driven Architecture

System Overview

WebSocket Architecture

Khadem's WebSocket system is built around a modular architecture with clear separation of concerns:

Core Components

  • SocketServer: Main entry point for WebSocket connections
  • SocketManager: Manages clients, rooms, and event routing
  • SocketClient: Represents individual WebSocket connections
  • SocketHandler: Processes incoming messages and executes middleware
  • SocketMiddlewarePipeline: Executes middleware in priority order

Key Features

  • Event-driven: Register handlers for specific events
  • Room-based: Group clients and broadcast to rooms
  • Middleware: Connection, message, room, and disconnect middleware
  • Authentication: Built-in auth callbacks and user context
  • Type-safe: Strongly typed event handlers and middleware

Message Format

All WebSocket messages use JSON format with event and data fields:

{
  "event": "send_message",
  "data": {
    "message": "Hello World",
    "room": "general"
  }
}

Quick Start

Basic Server Setup

1. Create WebSocket Server

dart
import 'package:khadem/khadem_dart.dart';

class WebSocketController {
  static Future handleConnection(SocketClient client, Request request) async {
    print('New WebSocket connection: ${client.id} from ${request.ip}');

    // Send welcome message
    client.sendJson({
      'event': 'welcome',
      'message': 'Welcome to Khadem WebSocket server!',
      'timestamp': DateTime.now().toIso8601String()
    });
  }
}

// Setup WebSocket server
void setupWebSocketServer(Server server) {
  final socketServer = SocketServer();

  // Add authentication
  socketServer.useAuth((client, request) async {
    final token = request.query['token'];
    if (token == null) return null;

    // Verify token and return user ID
    final payload = await verifyJWTToken(token);
    return payload?['sub'];
  });

  // Add connection middleware
  socketServer.use((client, request) async {
    print('Connection middleware: ${request.ip}');
    return true;
  });

  // Setup event handlers
  socketServer.on('ping', (client, data) async {
    client.sendJson({
      'event': 'pong',
      'timestamp': DateTime.now().toIso8601String()
    });
  });

  socketServer.on('echo', (client, data) async {
    final message = data['message'];
    if (message != null) {
      client.sendJson({
        'event': 'echo',
        'message': message,
        'timestamp': DateTime.now().toIso8601String()
      });
    }
  });

  // Handle connections
  socketServer.onConnect((client) async {
    print('Client connected: ${client.id}');
  });

  socketServer.onDisconnect((client) async {
    print('Client disconnected: ${client.id}');
  });

  // Mount WebSocket server
  server.websocket('/ws', (socket, request) async {
    await socketServer.handleConnection(socket, request);
  });
}

// Main server setup
void main() async {
  final server = Server();

  // Setup WebSocket routes
  setupWebSocketServer(server);

  // Start server
  await server.listen(8080);
  print('Server running on port 8080 with WebSocket support');
}

// Helper function for JWT verification
Future<Map<String, dynamic>?> verifyJWTToken(String token) async {
  try {
    // Implement your JWT verification logic here
    // This is a placeholder
    return {
      'sub': 'user123',
      'username': 'john_doe'
    };
  } catch (e) {
    return null;
  }
}

2. Handle Events

dart
class WebSocketController {
  static Future handleConnection(SocketServer server) async {
    // Setup event handlers
    server.on('ping', (SocketClient client, Map<String, dynamic> data) async {
      print('Received ping from ${client.id}');

      // Send pong response
      client.sendJson({
        'event': 'pong',
        'timestamp': DateTime.now().toIso8601String()
      });
    });

    server.on('echo', (SocketClient client, Map<String, dynamic> data) async {
      final message = data['message'];
      print('Received echo from ${client.id}: $message');

      // Echo the message back
      client.sendJson({
        'event': 'echo_response',
        'original_message': message,
        'timestamp': DateTime.now().toIso8601String()
      });
    });

    server.on('broadcast', (SocketClient client, Map<String, dynamic> data) async {
      final message = data['message'];
      print('Broadcast request from ${client.id}: $message');

      // Broadcast to all connected clients
      await server.broadcast({
        'event': 'broadcast_message',
        'from': client.get('user_id'),
        'message': message,
        'timestamp': DateTime.now().toIso8601String()
      });
    });

    // Handle connection events
    server.onConnect((SocketClient client) async {
      print('Client connected: ${client.id}');

      // Send welcome message
      client.sendJson({
        'event': 'connected',
        'client_id': client.id,
        'timestamp': DateTime.now().toIso8601String()
      });
    });

    server.onDisconnect((SocketClient client) async {
      print('Client disconnected: ${client.id}');
    });
  }
}

// Usage
void setupWebSocketEvents(SocketServer server) {
  WebSocketController.handleConnection(server);
}

3. Start Server

dart
import 'package:khadem/khadem_dart.dart';

void main() async {
  // Create HTTP server
  final server = Server();

  // Create WebSocket server
  final socketServer = SocketServer();

  // Setup authentication
  socketServer.useAuth((client, request) async {
    final token = request.query['token'];
    if (token == null) return null;

    // Verify token and return user ID
    final payload = await verifyJWTToken(token);
    return payload?['sub'];
  });

  // Setup event handlers
  socketServer.on('ping', (client, data) async {
    client.sendJson({'event': 'pong'});
  });

  socketServer.on('join_room', (client, data) async {
    final roomName = data['room'];
    if (roomName != null) {
      await client.joinRoom(roomName);
      client.sendJson({
        'event': 'room_joined',
        'room': roomName
      });
    }
  });

  socketServer.on('send_message', (client, data) async {
    final roomName = data['room'];
    final message = data['message'];

    if (roomName != null && message != null) {
      await socketServer.broadcastToRoom(roomName, {
        'event': 'message',
        'from': client.get('user_id'),
        'message': message,
        'timestamp': DateTime.now().toIso8601String()
      }, exclude: client);
    }
  });

  // Mount WebSocket server
  server.websocket('/ws', (socket, request) async {
    await socketServer.handleConnection(socket, request);
  });

  // Add HTTP routes
  server.get('/', (request) async {
    final html = '''
      <!DOCTYPE html>
      <html>
        <head>
          <title>Khadem WebSocket Server</title>
        </head>
        <body>
          <h1>Khadem WebSocket Server</h1>
          <p>WebSocket endpoint: <code>ws://localhost:8080/ws</code></p>
          <CodeBlock
            :code="clientExampleCode"
            language="javascript"
            title="Basic WebSocket Client Example"
          />
        </body>
      </html>
    ''';
    return Response.html(html);
  });

  // Start server
  await server.listen(8080);
  print('Server running on http://localhost:8080');
  print('WebSocket endpoint: ws://localhost:8080/ws');
}

// Helper function for JWT verification
Future<Map<String, dynamic>?> verifyJWTToken(String token) async {
  try {
    // Implement your JWT verification logic here
    // This is a placeholder
    return {
      'sub': 'user123',
      'username': 'john_doe'
    };
  } catch (e) {
    return null;
  }
}

Room-based Messaging

Room Management

dart
// Room management using SocketServer and SocketManager
class ChatServer {
  final SocketServer server;
  final SocketManager manager;

  ChatServer(this.server, this.manager) {
    setupEventHandlers();
  }

  void setupEventHandlers() {
    // Handle room join
    server.on('join_room', (SocketClient client, Map<String, dynamic> data) async {
      final roomName = data['room'] as String?;
      if (roomName == null) return;

      try {
        await client.joinRoom(roomName);

        // Notify others in the room
        await manager.broadcastToRoom(roomName, {
          'event': 'user_joined',
          'user_id': client.get('user_id'),
          'username': client.get('username'),
          'room': roomName,
          'timestamp': DateTime.now().toIso8601String()
        }, exclude: client);

        // Send room info to the client
        final roomClients = manager.getClientsInRoom(roomName);
        client.sendJson({
          'event': 'room_joined',
          'room': roomName,
          'user_count': roomClients.length,
          'users': roomClients.map((c) => {
            'user_id': c.get('user_id'),
            'username': c.get('username')
          }).toList()
        });

      } catch (e) {
        client.sendJson({
          'event': 'error',
          'message': 'Failed to join room: ${e.toString()}'
        });
      }
    });

    // Handle room leave
    server.on('leave_room', (SocketClient client, Map<String, dynamic> data) async {
      final roomName = data['room'] as String?;
      if (roomName == null) return;

      try {
        await client.leaveRoom(roomName);

        // Notify others in the room
        await manager.broadcastToRoom(roomName, {
          'event': 'user_left',
          'user_id': client.get('user_id'),
          'username': client.get('username'),
          'room': roomName,
          'timestamp': DateTime.now().toIso8601String()
        });

      } catch (e) {
        client.sendJson({
          'event': 'error',
          'message': 'Failed to leave room: ${e.toString()}'
        });
      }
    });

    // Handle room messages
    server.on('room_message', (SocketClient client, Map<String, dynamic> data) async {
      final roomName = data['room'] as String?;
      final message = data['message'] as String?;

      if (roomName == null || message == null) return;

      // Check if client is in the room
      if (!client.isInRoom(roomName)) {
        client.sendJson({
          'event': 'error',
          'message': 'You are not in this room'
        });
        return;
      }

      // Broadcast message to room
      await manager.broadcastToRoom(roomName, {
        'event': 'room_message',
        'user_id': client.get('user_id'),
        'username': client.get('username'),
        'message': message,
        'room': roomName,
        'timestamp': DateTime.now().toIso8601String()
      }, exclude: client);
    });
  }
}

// Usage
void main() async {
  final server = SocketServer();
  final manager = SocketManager();

  final chatServer = ChatServer(server, manager);

  // Add middleware
  server.useAuth((client, request) async {
    final token = request.query['token'];
    if (token == null) return null;

    // Verify token and return user ID
    final payload = await verifyJWTToken(token);
    return payload?['sub'];
  });

  // Start server
  await server.listen(8080);
  print('WebSocket server running on port 8080');
}

Rooms allow you to group clients and broadcast messages to specific groups.

Broadcasting

dart
// Broadcasting using SocketManager
class BroadcastingService {
  final SocketManager manager;

  BroadcastingService(this.manager);

  // Broadcast to all connected clients
  Future<void> broadcastToAll(Map<String, dynamic> message) async {
    await manager.broadcast(message);
  }

  // Broadcast to specific room
  Future<void> broadcastToRoom(String roomName, Map<String, dynamic> message, {
    SocketClient? exclude
  }) async {
    await manager.broadcastToRoom(roomName, message, exclude: exclude);
  }

  // Broadcast to specific user
  Future<void> broadcastToUser(String userId, Map<String, dynamic> message) async {
    final client = manager.getClientByUserId(userId);
    if (client != null) {
      client.sendJson(message);
    }
  }

  // Broadcast to multiple users
  Future<void> broadcastToUsers(List<String> userIds, Map<String, dynamic> message) async {
    for (final userId in userIds) {
      await broadcastToUser(userId, message);
    }
  }

  // Broadcast except specific users
  Future<void> broadcastExcept(List<String> excludeUserIds, Map<String, dynamic> message) async {
    final allClients = manager.getAllClients();

    for (final client in allClients) {
      final userId = client.get('user_id');
      if (userId != null && !excludeUserIds.contains(userId)) {
        client.sendJson(message);
      }
    }
  }

  // Get room statistics
  Map<String, dynamic> getRoomStats(String roomName) {
    final clients = manager.getClientsInRoom(roomName);
    return {
      'room': roomName,
      'user_count': clients.length,
      'users': clients.map((c) => c.get('user_id')).toList()
    };
  }

  // Get server statistics
  Map<String, dynamic> getServerStats() {
    return {
      'total_clients': manager.getTotalClients(),
      'total_rooms': manager.getTotalRooms(),
      'rooms': manager.getAllRooms().map((roomName) => getRoomStats(roomName)).toList()
    };
  }
}

// Usage example
void setupBroadcasting(SocketServer server, SocketManager manager) {
  final broadcaster = BroadcastingService(manager);

  // Handle broadcast requests
  server.on('broadcast', (SocketClient client, Map<String, dynamic> data) async {
    final target = data['target'];
    final message = data['message'];

    if (target == null || message == null) return;

    try {
      switch (target['type']) {
        case 'all':
          await broadcaster.broadcastToAll({
            'event': 'broadcast',
            'from': client.get('user_id'),
            'message': message
          });
          break;

        case 'room':
          final roomName = target['room'];
          if (roomName != null) {
            await broadcaster.broadcastToRoom(roomName, {
              'event': 'room_broadcast',
              'from': client.get('user_id'),
              'message': message
            }, exclude: client);
          }
          break;

        case 'user':
          final userId = target['user_id'];
          if (userId != null) {
            await broadcaster.broadcastToUser(userId, {
              'event': 'direct_message',
              'from': client.get('user_id'),
              'message': message
            });
          }
          break;
      }
    } catch (e) {
      client.sendJson({
        'event': 'error',
        'message': 'Broadcast failed: ${e.toString()}'
      });
    }
  });

  // Handle stats requests
  server.on('get_stats', (SocketClient client, Map<String, dynamic> data) async {
    final stats = broadcaster.getServerStats();
    client.sendJson({
      'event': 'stats',
      'data': stats
    });
  });
}

Send messages to all clients in a room or to individual clients.

Event Subscription System

dart
// Event subscription system using SocketServer
class EventSubscriptionService {
  final SocketServer server;
  final Map<String, Set<SocketClient>> _eventSubscribers = {};

  EventSubscriptionService(this.server) {
    setupEventHandlers();
  }

  void setupEventHandlers() {
    // Handle event subscription
    server.on('subscribe', (SocketClient client, Map<String, dynamic> data) async {
      final event = data['event'] as String?;
      if (event == null) return;

      _eventSubscribers.putIfAbsent(event, () => {}).add(client);

      client.sendJson({
        'event': 'subscribed',
        'subscription_event': event,
        'timestamp': DateTime.now().toIso8601String()
      });

      print('Client ${client.id} subscribed to event: $event');
    });

    // Handle event unsubscription
    server.on('unsubscribe', (SocketClient client, Map<String, dynamic> data) async {
      final event = data['event'] as String?;
      if (event == null) return;

      final subscribers = _eventSubscribers[event];
      if (subscribers != null) {
        subscribers.remove(client);
        if (subscribers.isEmpty) {
          _eventSubscribers.remove(event);
        }
      }

      client.sendJson({
        'event': 'unsubscribed',
        'subscription_event': event,
        'timestamp': DateTime.now().toIso8601String()
      });

      print('Client ${client.id} unsubscribed from event: $event');
    });

    // Handle event broadcasting
    server.on('broadcast_event', (SocketClient client, Map<String, dynamic> data) async {
      final event = data['event'] as String?;
      final eventData = data['data'];

      if (event == null) return;

      await broadcastToEvent(event, {
        'event': event,
        'data': eventData,
        'from_user': client.get('user_id'),
        'timestamp': DateTime.now().toIso8601String()
      }, exclude: client);
    });

    // Handle client disconnection
    server.onDisconnect((SocketClient client) async {
      // Remove client from all subscriptions
      for (final subscribers in _eventSubscribers.values) {
        subscribers.remove(client);
      }

      // Clean up empty subscription sets
      _eventSubscribers.removeWhere((event, subscribers) => subscribers.isEmpty);
    });
  }

  // Broadcast to all subscribers of an event
  Future<void> broadcastToEvent(String event, Map<String, dynamic> message, {
    SocketClient? exclude
  }) async {
    final subscribers = _eventSubscribers[event];
    if (subscribers == null) return;

    final disconnectedClients = <SocketClient>[];

    for (final subscriber in subscribers) {
      if (subscriber != exclude && subscriber.isConnected) {
        try {
          subscriber.sendJson(message);
        } catch (e) {
          print('Failed to send to subscriber: $e');
          disconnectedClients.add(subscriber);
        }
      }
    }

    // Remove disconnected clients
    for (final client in disconnectedClients) {
      subscribers.remove(client);
    }

    if (subscribers.isEmpty) {
      _eventSubscribers.remove(event);
    }
  }

  // Get subscription statistics
  Map<String, dynamic> getSubscriptionStats() {
    return {
      'total_events': _eventSubscribers.length,
      'events': _eventSubscribers.map((event, subscribers) => MapEntry(event, {
        'subscriber_count': subscribers.length,
        'subscribers': subscribers.map((c) => c.get('user_id')).toList()
      }))
    };
  }

  // Check if client is subscribed to event
  bool isSubscribed(SocketClient client, String event) {
    final subscribers = _eventSubscribers[event];
    return subscribers?.contains(client) ?? false;
  }

  // Get all events client is subscribed to
  List<String> getClientSubscriptions(SocketClient client) {
    final subscriptions = <String>[];

    for (final entry in _eventSubscribers.entries) {
      if (entry.value.contains(client)) {
        subscriptions.add(entry.key);
      }
    }

    return subscriptions;
  }
}

// Usage example
void setupEventSubscription(SocketServer server) {
  final eventService = EventSubscriptionService(server);

  // Add some predefined events
  server.on('user_online', (SocketClient client, Map<String, dynamic> data) async {
    await eventService.broadcastToEvent('user_status', {
      'user_id': client.get('user_id'),
      'status': 'online',
      'timestamp': DateTime.now().toIso8601String()
    });
  });

  server.on('user_offline', (SocketClient client, Map<String, dynamic> data) async {
    await eventService.broadcastToEvent('user_status', {
      'user_id': client.get('user_id'),
      'status': 'offline',
      'timestamp': DateTime.now().toIso8601String()
    });
  });

  // Handle stats requests
  server.on('get_subscription_stats', (SocketClient client, Map<String, dynamic> data) async {
    final stats = eventService.getSubscriptionStats();
    client.sendJson({
      'event': 'subscription_stats',
      'data': stats
    });
  });
}

Event Subscription Features

  • Subscribe clients to specific events for targeted messaging
  • Broadcast events to all subscribed clients
  • Check subscription status and subscriber counts
  • Automatic cleanup when clients disconnect
  • Real-time subscription management

Middleware Pipeline

Middleware Types

dart
// SocketMiddleware types from the actual implementation
abstract class SocketMiddleware {
  // Handle connection
  Future<bool> handleConnection(SocketClient client, Request request) async {
    return true;
  }

  // Handle message
  Future<bool> handleMessage(SocketClient client, String message) async {
    return true;
  }

  // Handle room join
  Future<bool> handleRoomJoin(SocketClient client, String roomName) async {
    return true;
  }

  // Handle disconnect
  Future<void> handleDisconnect(SocketClient client) async {}
}

// Connection middleware example
class ConnectionLimitMiddleware extends SocketMiddleware {
  static final Map<String, int> _connectionsPerIP = {};
  final int maxConnectionsPerIP = 10;

  @override
  Future<bool> handleConnection(SocketClient client, Request request) async {
    final ip = request.ip;
    final currentCount = _connectionsPerIP[ip] ?? 0;

    if (currentCount >= maxConnectionsPerIP) {
      client.sendJson({
        'event': 'connection_rejected',
        'reason': 'Too many connections from this IP'
      });
      return false;
    }

    _connectionsPerIP[ip] = currentCount + 1;
    client.set('ip', ip);

    return true;
  }

  @override
  Future<void> handleDisconnect(SocketClient client) async {
    final ip = client.get('ip');
    if (ip != null && _connectionsPerIP.containsKey(ip)) {
      _connectionsPerIP[ip] = _connectionsPerIP[ip]! - 1;
      if (_connectionsPerIP[ip] == 0) {
        _connectionsPerIP.remove(ip);
      }
    }
  }
}

// Message validation middleware
class MessageValidationMiddleware extends SocketMiddleware {
  @override
  Future<bool> handleMessage(SocketClient client, String message) async {
    try {
      final data = jsonDecode(message);

      // Validate required fields
      if (!data.containsKey('event') || !data.containsKey('data')) {
        client.sendJson({
          'event': 'error',
          'message': 'Invalid message format: missing event or data field'
        });
        return false;
      }

      // Validate event type
      final event = data['event'];
      if (event is! String || event.isEmpty) {
        client.sendJson({
          'event': 'error',
          'message': 'Invalid event type'
        });
        return false;
      }

      return true;
    } catch (e) {
      client.sendJson({
        'event': 'error',
        'message': 'Invalid JSON format'
      });
      return false;
    }
  }
}

// Room permission middleware
class RoomPermissionMiddleware extends SocketMiddleware {
  @override
  Future<bool> handleRoomJoin(SocketClient client, String roomName) async {
    final userId = client.get('user_id');

    // Public rooms (no permission check)
    if (roomName.startsWith('public:')) {
      return true;
    }

    // Private rooms (user-specific)
    if (roomName.startsWith('private:')) {
      return roomName == 'private:$userId';
    }

    // Admin rooms
    if (roomName.startsWith('admin:')) {
      final isAdmin = client.get('is_admin') == true;
      if (!isAdmin) {
        client.sendJson({
          'event': 'room_join_denied',
          'room': roomName,
          'reason': 'Admin access required'
        });
        return false;
      }
    }

    return true;
  }
}

// Logging middleware
class LoggingMiddleware extends SocketMiddleware {
  @override
  Future<bool> handleConnection(SocketClient client, Request request) async {
    print('WebSocket connection from ${request.ip} at ${DateTime.now()}');
    return true;
  }

  @override
  Future<bool> handleMessage(SocketClient client, String message) async {
    final userId = client.get('user_id') ?? 'anonymous';
    print('Message from user $userId: ${message.substring(0, min(100, message.length))}');
    return true;
  }

  @override
  Future<void> handleDisconnect(SocketClient client) async {
    final userId = client.get('user_id') ?? 'anonymous';
    print('User $userId disconnected');
  }
}

Authentication Middleware

dart
// Authentication middleware using server.useAuth()
class AuthMiddleware extends SocketMiddleware {
  @override
  Future<bool> handleConnection(SocketClient client, Request request) async {
    // Check for authentication token in query parameters or headers
    final token = request.query['token'] ?? request.headers['authorization'];

    if (token == null || token.isEmpty) {
      client.sendJson({
        'event': 'auth_required',
        'message': 'Authentication token required'
      });
      return false;
    }

    try {
      // Verify JWT token (implement your JWT verification logic)
      final payload = await verifyJWTToken(token);

      if (payload == null) {
        throw Exception('Invalid token');
      }

      // Store user information in client context
      client.set('user_id', payload['sub']);
      client.set('username', payload['username']);
      client.set('is_admin', payload['role'] == 'admin');
      client.set('authenticated', true);

      // Send authentication success
      client.sendJson({
        'event': 'authenticated',
        'user_id': payload['sub'],
        'username': payload['username']
      });

      return true;
    } catch (e) {
      client.sendJson({
        'event': 'auth_failed',
        'message': 'Authentication failed: ${e.toString()}'
      });
      return false;
    }
  }
}

// Usage with SocketServer
void setupWebSocketServer() {
  final server = SocketServer();

  // Add authentication middleware
  server.useAuth((SocketClient client, Request request) async {
    final token = request.query['token'];
    if (token == null) return null;

    // Return user ID if authenticated, null otherwise
    final payload = await verifyJWTToken(token);
    return payload?['sub'];
  });

  // Add other middleware
  server.use((SocketClient client, Request request) async {
    // Connection middleware logic
    return true;
  });

  // Setup event handlers
  server.on('join_room', (SocketClient client, Map<String, dynamic> data) async {
    final roomName = data['room'];
    if (roomName != null) {
      await client.joinRoom(roomName);
      client.sendJson({
        'event': 'room_joined',
        'room': roomName
      });
    }
  });

  server.on('send_message', (SocketClient client, Map<String, dynamic> data) async {
    final roomName = data['room'];
    final message = data['message'];

    if (roomName != null && message != null) {
      await server.broadcastToRoom(roomName, {
        'event': 'message',
        'user_id': client.get('user_id'),
        'username': client.get('username'),
        'message': message,
        'timestamp': DateTime.now().toIso8601String()
      }, exclude: client);
    }
  });
}

// Helper function for JWT verification
Future<Map<String, dynamic>?> verifyJWTToken(String token) async {
  try {
    // Implement your JWT verification logic here
    // This is a placeholder - use a proper JWT library
    return {
      'sub': 'user123',
      'username': 'john_doe',
      'role': 'user'
    };
  } catch (e) {
    return null;
  }
}

Rate Limiting Middleware

dart
// Rate limiting middleware for SocketClient
class RateLimitMiddleware extends SocketMiddleware {
  static final Map<String, RateLimiter> _rateLimiters = {};
  final Map<String, int> _limits = {
    'message': 60,      // 60 messages per minute
    'join_room': 10,    // 10 room joins per minute
    'leave_room': 10,   // 10 room leaves per minute
    'ping': 120,        // 120 pings per minute
  };

  @override
  Future<bool> handleMessage(SocketClient client, String message) async {
    try {
      final data = jsonDecode(message);
      final event = data['event'] as String?;

      if (event == null) return true;

      final userId = client.get('user_id') ?? client.id;
      final limit = _limits[event] ?? 30; // Default 30 requests per minute

      final key = '$userId:$event';
      final limiter = _rateLimiters.putIfAbsent(
        key,
        () => RateLimiter(maxRequests: limit, window: Duration(minutes: 1))
      );

      final result = await limiter.check(userId);

      if (!result.allowed) {
        client.sendJson({
          'event': 'rate_limited',
          'message': 'Too many $event requests. Try again later.',
          'retry_after_seconds': result.retryAfter?.inSeconds ?? 60
        });
        return false;
      }

      return true;
    } catch (e) {
      // If message parsing fails, allow it through
      return true;
    }
  }
}

// Rate limiter implementation
class RateLimiter {
  final int maxRequests;
  final Duration window;
  final List<DateTime> _requests = [];

  RateLimiter({required this.maxRequests, required this.window});

  Future<RateLimitResult> check(String identifier) async {
    final now = DateTime.now();

    // Remove old requests outside the window
    _requests.removeWhere((time) => now.difference(time) > window);

    if (_requests.length >= maxRequests) {
      final oldestRequest = _requests.first;
      final resetTime = oldestRequest.add(window);
      final retryAfter = resetTime.difference(now);

      return RateLimitResult(
        allowed: false,
        retryAfter: retryAfter
      );
    }

    _requests.add(now);
    return RateLimitResult(allowed: true, retryAfter: null);
  }
}

class RateLimitResult {
  final bool allowed;
  final Duration? retryAfter;

  RateLimitResult({required this.allowed, this.retryAfter});
}

// Usage with SocketServer
void setupRateLimitedServer() {
  final server = SocketServer();

  // Add rate limiting middleware
  server.use((SocketClient client, Request request) async {
    // Rate limiting is handled per message in the middleware above
    return true;
  });

  // Add rate limiting specifically for connections
  final connectionLimiter = RateLimiter(maxRequests: 100, window: Duration(minutes: 1));

  server.use((SocketClient client, Request request) async {
    final ip = request.ip;
    final result = await connectionLimiter.check(ip);

    if (!result.allowed) {
      client.sendJson({
        'event': 'connection_rate_limited',
        'message': 'Too many connections from this IP'
      });
      return false;
    }

    return true;
  });
}

Client Management

dart
// Client management using SocketClient and SocketManager
class ClientManager {
  final SocketManager manager;
  final Map<String, SocketClient> _clients = {};
  final Map<String, Map<String, dynamic>> _clientMetadata = {};

  ClientManager(this.manager);

  // Add client
  void addClient(SocketClient client) {
    final userId = client.get('user_id') ?? client.id;
    _clients[userId] = client;

    // Store metadata
    _clientMetadata[userId] = {
      'connected_at': DateTime.now().toIso8601String(),
      'ip': client.get('ip'),
      'user_agent': client.get('user_agent'),
      'rooms': <String>[],
      'last_activity': DateTime.now().toIso8601String()
    };

    print('Client added: $userId');
  }

  // Remove client
  void removeClient(String userId) {
    final client = _clients.remove(userId);
    if (client != null) {
      // Leave all rooms
      final metadata = _clientMetadata[userId];
      if (metadata != null) {
        final rooms = List<String>.from(metadata['rooms'] ?? []);
        for (final room in rooms) {
          client.leaveRoom(room);
        }
      }

      _clientMetadata.remove(userId);
      print('Client removed: $userId');
    }
  }

  // Get client by user ID
  SocketClient? getClient(String userId) {
    return _clients[userId];
  }

  // Get all clients
  List<SocketClient> getAllClients() {
    return _clients.values.toList();
  }

  // Update client metadata
  void updateClientMetadata(String userId, String key, dynamic value) {
    if (_clientMetadata.containsKey(userId)) {
      _clientMetadata[userId]![key] = value;
      _clientMetadata[userId]!['last_activity'] = DateTime.now().toIso8601String();
    }
  }

  // Get client metadata
  Map<String, dynamic>? getClientMetadata(String userId) {
    return _clientMetadata[userId];
  }

  // Broadcast to all clients
  Future<void> broadcastToAll(Map<String, dynamic> message) async {
    await manager.broadcast(message);
  }

  // Send message to specific client
  Future<void> sendToClient(String userId, Map<String, dynamic> message) async {
    final client = _clients[userId];
    if (client != null && client.isConnected) {
      client.sendJson(message);
    }
  }

  // Get client statistics
  Map<String, dynamic> getStats() {
    final now = DateTime.now();
    final activeClients = _clients.values.where((c) => c.isConnected).length;

    return {
      'total_clients': _clients.length,
      'active_clients': activeClients,
      'inactive_clients': _clients.length - activeClients,
      'clients': _clientMetadata.entries.map((entry) => {
        'user_id': entry.key,
        'connected_at': entry.value['connected_at'],
        'last_activity': entry.value['last_activity'],
        'rooms': entry.value['rooms'],
        'is_active': _clients[entry.key]?.isConnected ?? false
      }).toList()
    };
  }

  // Clean up inactive clients
  void cleanupInactiveClients() {
    final inactiveClients = <String>[];

    for (final entry in _clients.entries) {
      if (!entry.value.isConnected) {
        inactiveClients.add(entry.key);
      }
    }

    for (final userId in inactiveClients) {
      removeClient(userId);
    }
  }
}

// SocketClient extension methods
extension SocketClientExtensions on SocketClient {
  // Check if client is in a room
  bool isInRoom(String roomName) {
    // This would be implemented in the actual SocketClient class
    return get('rooms')?.contains(roomName) ?? false;
  }

  // Get client rooms
  List<String> getRooms() {
    return List<String>.from(get('rooms') ?? []);
  }

  // Join room with metadata update
  Future<void> joinRoom(String roomName) async {
    // Implementation would call the actual joinRoom method
    final rooms = getRooms();
    if (!rooms.contains(roomName)) {
      rooms.add(roomName);
      set('rooms', rooms);
    }
  }

  // Leave room with metadata update
  Future<void> leaveRoom(String roomName) async {
    // Implementation would call the actual leaveRoom method
    final rooms = getRooms();
    rooms.remove(roomName);
    set('rooms', rooms);
  }

  // Send JSON message
  void sendJson(Map<String, dynamic> data) {
    try {
      final message = jsonEncode(data);
      send(message);
    } catch (e) {
      print('Failed to send JSON message: $e');
    }
  }
}

// Usage example
void setupClientManagement(SocketServer server, SocketManager manager) {
  final clientManager = ClientManager(manager);

  // Handle client connections
  server.onConnect((SocketClient client) async {
    clientManager.addClient(client);

    // Send welcome message
    client.sendJson({
      'event': 'welcome',
      'user_id': client.get('user_id'),
      'timestamp': DateTime.now().toIso8601String()
    });
  });

  // Handle client disconnections
  server.onDisconnect((SocketClient client) async {
    final userId = client.get('user_id') ?? client.id;
    clientManager.removeClient(userId);
  });

  // Handle client info requests
  server.on('get_client_info', (SocketClient client, Map<String, dynamic> data) async {
    final userId = data['user_id'] as String?;
    if (userId != null) {
      final metadata = clientManager.getClientMetadata(userId);
      if (metadata != null) {
        client.sendJson({
          'event': 'client_info',
          'user_id': userId,
          'data': metadata
        });
      }
    }
  });

  // Handle stats requests
  server.on('get_client_stats', (SocketClient client, Map<String, dynamic> data) async {
    final stats = clientManager.getStats();
    client.sendJson({
      'event': 'client_stats',
      'data': stats
    });
  });
}

Client Context Features

Context Storage
  • set(key, value) - Store data in client context
  • get(key) - Retrieve data from context
  • isAuthorized - Check authorization status
  • isAuthenticated - Check authentication status
Header Access
  • authToken - Get authorization token
  • userAgent - Get user agent string
  • getHeader(name) - Get specific header
  • getHeaderValues(name) - Get all header values

Error Handling

dart
class ErrorHandler {
  static void handle(WebSocket socket, dynamic error, StackTrace? stackTrace) {
    print('WebSocket error: $error');
    if (stackTrace != null) {
      print('Stack trace: $stackTrace');
    }

    // Send error to client
    try {
      socket.send(jsonEncode({
        'type': 'error',
        'message': 'An error occurred',
        'timestamp': DateTime.now().toIso8601String()
      }));
    } catch (e) {
      print('Failed to send error to client: $e');
    }
  }

  static void handleConnectionError(WebSocket socket, dynamic error) {
    print('Connection error: $error');

    try {
      socket.send(jsonEncode({
        'type': 'connection_error',
        'message': 'Connection failed',
        'error': error.toString()
      }));
    } catch (e) {
      // Socket might be closed, ignore
    }
  }

  static void handleMessageError(WebSocket socket, String message, dynamic error) {
    print('Message processing error: $error');

    try {
      socket.send(jsonEncode({
        'type': 'message_error',
        'message': 'Failed to process message',
        'original_message': message,
        'error': error.toString()
      }));
    } catch (e) {
      print('Failed to send message error: $e');
    }
  }
}

Exception Handler Features

  • Automatic error logging with stack traces
  • Client-specific error responses
  • Development vs production error details
  • Middleware error handling
  • Connection and disconnection error handling

Advanced Usage

Real-time Chat Application

dart
// Real-time chat application
class ChatRoom {
  static final Map<String, Set<WebSocket>> _rooms = {};
  static final Map<String, Map<String, dynamic>> _roomUsers = {};
  static final Map<String, List<Map<String, dynamic>>> _messageHistory = {};

  // Join chat room
  static Future joinRoom(String roomName, WebSocket socket, String userId, String username) async {
    _rooms.putIfAbsent(roomName, () => {}).add(socket);
    _roomUsers.putIfAbsent(roomName, () => {})[userId] = {
      'username': username,
      'joined_at': DateTime.now().toIso8601String(),
      'socket': socket
    };

    // Send recent message history
    final history = _messageHistory[roomName] ?? [];
    if (history.isNotEmpty) {
      socket.send(jsonEncode({
        'type': 'message_history',
        'room': roomName,
        'messages': history.take(50).toList() // Last 50 messages
      }));
    }

    // Notify room members
    await broadcastToRoom(roomName, {
      'type': 'user_joined',
      'user_id': userId,
      'username': username,
      'room': roomName,
      'timestamp': DateTime.now().toIso8601String()
    }, exclude: socket);

    // Send room info
    socket.send(jsonEncode({
      'type': 'room_joined',
      'room': roomName,
      'user_count': _rooms[roomName]?.length ?? 0,
      'users': _roomUsers[roomName]?.values.map((u) => {
        'user_id': u.keys.firstWhere((k) => k != 'socket'),
        'username': u['username'],
        'joined_at': u['joined_at']
      }).toList()
    }));
  }

  // Leave chat room
  static Future leaveRoom(String roomName, WebSocket socket, String userId) async {
    final room = _rooms[roomName];
    final roomUsers = _roomUsers[roomName];

    if (room != null && roomUsers != null) {
      room.remove(socket);
      final userInfo = roomUsers[userId];

      if (userInfo != null) {
        final username = userInfo['username'];

        // Notify remaining members
        await broadcastToRoom(roomName, {
          'type': 'user_left',
          'user_id': userId,
          'username': username,
          'room': roomName,
          'timestamp': DateTime.now().toIso8601String()
        });

        roomUsers.remove(userId);
      }

      // Clean up empty rooms
      if (room.isEmpty) {
        _rooms.remove(roomName);
        _roomUsers.remove(roomName);
        _messageHistory.remove(roomName);
      }
    }
  }

  // Send message to room
  static Future sendMessage(String roomName, String userId, String username, String message) async {
    final messageData = {
      'id': generateMessageId(),
      'user_id': userId,
      'username': username,
      'message': message,
      'room': roomName,
      'timestamp': DateTime.now().toIso8601String()
    };

    // Store in history
    _messageHistory.putIfAbsent(roomName, () => []).add(messageData);

    // Keep only last 1000 messages
    if (_messageHistory[roomName]!.length > 1000) {
      _messageHistory[roomName] = _messageHistory[roomName]!.sublist(500);
    }

    // Broadcast to room
    await broadcastToRoom(roomName, {
      'type': 'message',
      ...messageData
    });

    // Save to database asynchronously
    unawaited(saveMessageToDatabase(messageData));
  }

  // Handle typing indicators
  static Future handleTyping(String roomName, String userId, String username, bool isTyping) async {
    await broadcastToRoom(roomName, {
      'type': 'typing',
      'user_id': userId,
      'username': username,
      'is_typing': isTyping,
      'room': roomName,
      'timestamp': DateTime.now().toIso8601String()
    }, excludeUserId: userId);
  }

  // Private messaging
  static Future sendPrivateMessage(String fromUserId, String fromUsername,
                                   String toUserId, String message) async {
    final messageData = {
      'id': generateMessageId(),
      'from_user_id': fromUserId,
      'from_username': fromUsername,
      'to_user_id': toUserId,
      'message': message,
      'timestamp': DateTime.now().toIso8601String()
    };

    // Find recipient's socket
    for (final room in _rooms.values) {
      for (final socket in room) {
        final socketUserId = getUserIdForSocket(socket);
        if (socketUserId == toUserId) {
          socket.send(jsonEncode({
            'type': 'private_message',
            ...messageData
          }));
          break;
        }
      }
    }

    // Save to database
    unawaited(savePrivateMessageToDatabase(messageData));
  }

  // Broadcast to room
  static Future broadcastToRoom(String roomName, dynamic message, {
    WebSocket? exclude,
    String? excludeUserId
  }) async {
    final room = _rooms[roomName];
    if (room != null) {
      final messageStr = message is String ? message : jsonEncode(message);

      for (final socket in room) {
        if (socket != exclude) {
          if (excludeUserId != null) {
            final socketUserId = getUserIdForSocket(socket);
            if (socketUserId == excludeUserId) continue;
          }

          try {
            socket.send(messageStr);
          } catch (e) {
            print('Failed to broadcast to room $roomName: $e');
            room.remove(socket);
          }
        }
      }
    }
  }

  // Get room information
  static Map<String, dynamic> getRoomInfo(String roomName) {
    final room = _rooms[roomName];
    final users = _roomUsers[roomName];

    return {
      'name': roomName,
      'user_count': room?.length ?? 0,
      'users': users?.entries.map((entry) => {
        'user_id': entry.key,
        'username': entry.value['username'],
        'joined_at': entry.value['joined_at']
      }).toList() ?? [],
      'message_count': _messageHistory[roomName]?.length ?? 0
    };
  }

  // Helper functions
  static String generateMessageId() {
    return 'msg_${DateTime.now().millisecondsSinceEpoch}_${Random().nextInt(1000)}';
  }

  static String? getUserIdForSocket(WebSocket socket) {
    for (final roomUsers in _roomUsers.values) {
      for (final entry in roomUsers.entries) {
        if (entry.value['socket'] == socket) {
          return entry.key;
        }
      }
    }
    return null;
  }

  static Future saveMessageToDatabase(Map<String, dynamic> message) async {
    try {
      await Database.table('chat_messages').insert({
        'id': message['id'],
        'user_id': message['user_id'],
        'room': message['room'],
        'message': message['message'],
        'created_at': message['timestamp']
      });
    } catch (e) {
      print('Failed to save message to database: $e');
    }
  }

  static Future savePrivateMessageToDatabase(Map<String, dynamic> message) async {
    try {
      await Database.table('private_messages').insert({
        'id': message['id'],
        'from_user_id': message['from_user_id'],
        'to_user_id': message['to_user_id'],
        'message': message['message'],
        'created_at': message['timestamp']
      });
    } catch (e) {
      print('Failed to save private message to database: $e');
    }
  }
}

// Chat message handler
class ChatMessageHandler {
  static Future handleMessage(WebSocket socket, String message, String userId, String username) async {
    try {
      final data = jsonDecode(message);

      switch (data['type']) {
        case 'join_room':
          final roomName = data['room'];
          if (roomName != null) {
            await ChatRoom.joinRoom(roomName, socket, userId, username);
          }
          break;

        case 'leave_room':
          final roomName = data['room'];
          if (roomName != null) {
            await ChatRoom.leaveRoom(roomName, socket, userId);
          }
          break;

        case 'send_message':
          final roomName = data['room'];
          final messageText = data['message'];
          if (roomName != null && messageText != null) {
            await ChatRoom.sendMessage(roomName, userId, username, messageText);
          }
          break;

        case 'typing':
          final roomName = data['room'];
          final isTyping = data['is_typing'] ?? false;
          if (roomName != null) {
            await ChatRoom.handleTyping(roomName, userId, username, isTyping);
          }
          break;

        case 'private_message':
          final toUserId = data['to_user'];
          final messageText = data['message'];
          if (toUserId != null && messageText != null) {
            await ChatRoom.sendPrivateMessage(userId, username, toUserId, messageText);
          }
          break;

        case 'get_room_info':
          final roomName = data['room'];
          if (roomName != null) {
            socket.send(jsonEncode({
              'type': 'room_info',
              'room': roomName,
              'info': ChatRoom.getRoomInfo(roomName)
            }));
          }
          break;

        default:
          socket.send(jsonEncode({
            'type': 'error',
            'message': 'Unknown message type: ${data['type']}'
          }));
      }
    } catch (e) {
      socket.send(jsonEncode({
        'type': 'error',
        'message': 'Invalid message format'
      }));
    }
  }
}

Live Notifications

dart
class NotificationManager {
  static final Map<String, Set<WebSocket>> _userSockets = {};

  // Register user socket
  static void registerUser(String userId, WebSocket socket) {
    _userSockets.putIfAbsent(userId, () => {}).add(socket);
  }

  // Unregister user socket
  static void unregisterUser(String userId, WebSocket socket) {
    final sockets = _userSockets[userId];
    if (sockets != null) {
      sockets.remove(socket);
      if (sockets.isEmpty) {
        _userSockets.remove(userId);
      }
    }
  }

  // Send notification to user
  static void sendNotification(String userId, String title, String message, {
    String type = 'info',
    Map<String, dynamic>? data
  }) {
    final sockets = _userSockets[userId];
    if (sockets != null) {
      final notification = {
        'type': 'notification',
        'title': title,
        'message': message,
        'notification_type': type,
        'data': data,
        'timestamp': DateTime.now().toIso8601String()
      };

      for (final socket in sockets) {
        socket.send(jsonEncode(notification));
      }
    }
  }

  // Broadcast notification to all users
  static void broadcastNotification(String title, String message, {
    String type = 'info',
    Map<String, dynamic>? data
  }) {
    final notification = {
      'type': 'notification',
      'title': title,
      'message': message,
      'notification_type': type,
      'data': data,
      'timestamp': DateTime.now().toIso8601String()
    };

    for (final sockets in _userSockets.values) {
      for (final socket in sockets) {
        socket.send(jsonEncode(notification));
      }
    }
  }
}

Game Server

dart
class GameServer {
  static final Map<String, GameRoom> _rooms = {};
  static final Map<String, Player> _players = {};

  // Create game room
  static String createRoom(String gameType, String hostId) {
    final roomId = generateRoomId();
    final room = GameRoom(roomId, gameType, hostId);
    _rooms[roomId] = room;
    return roomId;
  }

  // Join game room
  static bool joinRoom(String roomId, String playerId, WebSocket socket) {
    final room = _rooms[roomId];
    if (room == null) return false;

    final player = Player(playerId, socket);
    _players[playerId] = player;

    room.addPlayer(player);
    return true;
  }

  // Handle game message
  static void handleGameMessage(String roomId, String playerId, Map<String, dynamic> message) {
    final room = _rooms[roomId];
    final player = _players[playerId];

    if (room != null && player != null) {
      room.handleMessage(player, message);
    }
  }

  // Start game
  static void startGame(String roomId) {
    final room = _rooms[roomId];
    if (room != null) {
      room.startGame();
    }
  }
}

class GameRoom {
  final String id;
  final String gameType;
  final String hostId;
  final List<Player> players = [];
  bool gameStarted = false;

  GameRoom(this.id, this.gameType, this.hostId);

  void addPlayer(Player player) {
    players.add(player);
    broadcast({
      'type': 'player_joined',
      'player_id': player.id,
      'players_count': players.length
    });
  }

  void handleMessage(Player sender, Map<String, dynamic> message) {
    // Process game-specific messages
    switch (message['action']) {
      case 'move':
        handleMove(sender, message);
        break;
      case 'attack':
        handleAttack(sender, message);
        break;
      case 'chat':
        handleChat(sender, message);
        break;
    }
  }

  void handleMove(Player player, Map<String, dynamic> data) {
    broadcast({
      'type': 'player_moved',
      'player_id': player.id,
      'position': data['position']
    });
  }

  void handleAttack(Player attacker, Map<String, dynamic> data) {
    broadcast({
      'type': 'player_attacked',
      'attacker_id': attacker.id,
      'target_id': data['target_id'],
      'damage': data['damage']
    });
  }

  void handleChat(Player sender, Map<String, dynamic> data) {
    broadcast({
      'type': 'chat_message',
      'player_id': sender.id,
      'message': data['message']
    });
  }

  void broadcast(Map<String, dynamic> message) {
    final jsonMessage = jsonEncode(message);
    for (final player in players) {
      player.socket.send(jsonMessage);
    }
  }

  void startGame() {
    gameStarted = true;
    broadcast({
      'type': 'game_started',
      'players': players.map((p) => p.id).toList()
    });
  }
}

class Player {
  final String id;
  final WebSocket socket;

  Player(this.id, this.socket);

  void send(Map<String, dynamic> message) {
    socket.send(jsonEncode(message));
  }
}

String generateRoomId() {
  return 'room_${DateTime.now().millisecondsSinceEpoch}';
}

Best Practices

✅ Do's

  • Use rooms for organizing clients by context (chat rooms, game lobbies, etc.)
  • Implement proper authentication and authorization middleware
  • Use event subscriptions for targeted messaging
  • Handle errors gracefully and provide meaningful error messages
  • Use middleware for cross-cutting concerns like logging and rate limiting
  • Implement proper cleanup when clients disconnect
  • Use client context to store user-specific data
  • Test your WebSocket handlers thoroughly
  • Use meaningful event names and consistent message formats
  • Monitor connection counts and message throughput

❌ Don'ts

  • Don't send sensitive data without proper authentication
  • Don't block the event loop with long-running operations
  • Don't rely on client-side validation alone
  • Don't forget to handle client disconnections
  • Don't use WebSocket for file uploads (use HTTP instead)
  • Don't broadcast to all clients when you only need specific rooms
  • Don't ignore middleware errors - they can break the connection
  • Don't store large amounts of data in client context
  • Don't use synchronous operations in async handlers
  • Don't forget to implement rate limiting for production

On this page