WebSocket System
A powerful real-time communication system for building interactive applications with WebSocket support, middleware pipeline, and room-based messaging.
Real-time CommunicationRoom-based MessagingMiddleware PipelineEvent-driven Architecture
System Overview
WebSocket Architecture
Khadem's WebSocket system is built around a modular architecture with clear separation of concerns:
Core Components
- SocketServer: Main entry point for WebSocket connections
- SocketManager: Manages clients, rooms, and event routing
- SocketClient: Represents individual WebSocket connections
- SocketHandler: Processes incoming messages and executes middleware
- SocketMiddlewarePipeline: Executes middleware in priority order
Key Features
- Event-driven: Register handlers for specific events
- Room-based: Group clients and broadcast to rooms
- Middleware: Connection, message, room, and disconnect middleware
- Authentication: Built-in auth callbacks and user context
- Type-safe: Strongly typed event handlers and middleware
Message Format
All WebSocket messages use JSON format with event
and data
fields:
{
"event": "send_message",
"data": {
"message": "Hello World",
"room": "general"
}
}
Quick Start
Basic Server Setup
1. Create WebSocket Server
dart
import 'package:khadem/khadem_dart.dart';
class WebSocketController {
static Future handleConnection(SocketClient client, Request request) async {
print('New WebSocket connection: ${client.id} from ${request.ip}');
// Send welcome message
client.sendJson({
'event': 'welcome',
'message': 'Welcome to Khadem WebSocket server!',
'timestamp': DateTime.now().toIso8601String()
});
}
}
// Setup WebSocket server
void setupWebSocketServer(Server server) {
final socketServer = SocketServer();
// Add authentication
socketServer.useAuth((client, request) async {
final token = request.query['token'];
if (token == null) return null;
// Verify token and return user ID
final payload = await verifyJWTToken(token);
return payload?['sub'];
});
// Add connection middleware
socketServer.use((client, request) async {
print('Connection middleware: ${request.ip}');
return true;
});
// Setup event handlers
socketServer.on('ping', (client, data) async {
client.sendJson({
'event': 'pong',
'timestamp': DateTime.now().toIso8601String()
});
});
socketServer.on('echo', (client, data) async {
final message = data['message'];
if (message != null) {
client.sendJson({
'event': 'echo',
'message': message,
'timestamp': DateTime.now().toIso8601String()
});
}
});
// Handle connections
socketServer.onConnect((client) async {
print('Client connected: ${client.id}');
});
socketServer.onDisconnect((client) async {
print('Client disconnected: ${client.id}');
});
// Mount WebSocket server
server.websocket('/ws', (socket, request) async {
await socketServer.handleConnection(socket, request);
});
}
// Main server setup
void main() async {
final server = Server();
// Setup WebSocket routes
setupWebSocketServer(server);
// Start server
await server.listen(8080);
print('Server running on port 8080 with WebSocket support');
}
// Helper function for JWT verification
Future<Map<String, dynamic>?> verifyJWTToken(String token) async {
try {
// Implement your JWT verification logic here
// This is a placeholder
return {
'sub': 'user123',
'username': 'john_doe'
};
} catch (e) {
return null;
}
}
2. Handle Events
dart
class WebSocketController {
static Future handleConnection(SocketServer server) async {
// Setup event handlers
server.on('ping', (SocketClient client, Map<String, dynamic> data) async {
print('Received ping from ${client.id}');
// Send pong response
client.sendJson({
'event': 'pong',
'timestamp': DateTime.now().toIso8601String()
});
});
server.on('echo', (SocketClient client, Map<String, dynamic> data) async {
final message = data['message'];
print('Received echo from ${client.id}: $message');
// Echo the message back
client.sendJson({
'event': 'echo_response',
'original_message': message,
'timestamp': DateTime.now().toIso8601String()
});
});
server.on('broadcast', (SocketClient client, Map<String, dynamic> data) async {
final message = data['message'];
print('Broadcast request from ${client.id}: $message');
// Broadcast to all connected clients
await server.broadcast({
'event': 'broadcast_message',
'from': client.get('user_id'),
'message': message,
'timestamp': DateTime.now().toIso8601String()
});
});
// Handle connection events
server.onConnect((SocketClient client) async {
print('Client connected: ${client.id}');
// Send welcome message
client.sendJson({
'event': 'connected',
'client_id': client.id,
'timestamp': DateTime.now().toIso8601String()
});
});
server.onDisconnect((SocketClient client) async {
print('Client disconnected: ${client.id}');
});
}
}
// Usage
void setupWebSocketEvents(SocketServer server) {
WebSocketController.handleConnection(server);
}
3. Start Server
dart
import 'package:khadem/khadem_dart.dart';
void main() async {
// Create HTTP server
final server = Server();
// Create WebSocket server
final socketServer = SocketServer();
// Setup authentication
socketServer.useAuth((client, request) async {
final token = request.query['token'];
if (token == null) return null;
// Verify token and return user ID
final payload = await verifyJWTToken(token);
return payload?['sub'];
});
// Setup event handlers
socketServer.on('ping', (client, data) async {
client.sendJson({'event': 'pong'});
});
socketServer.on('join_room', (client, data) async {
final roomName = data['room'];
if (roomName != null) {
await client.joinRoom(roomName);
client.sendJson({
'event': 'room_joined',
'room': roomName
});
}
});
socketServer.on('send_message', (client, data) async {
final roomName = data['room'];
final message = data['message'];
if (roomName != null && message != null) {
await socketServer.broadcastToRoom(roomName, {
'event': 'message',
'from': client.get('user_id'),
'message': message,
'timestamp': DateTime.now().toIso8601String()
}, exclude: client);
}
});
// Mount WebSocket server
server.websocket('/ws', (socket, request) async {
await socketServer.handleConnection(socket, request);
});
// Add HTTP routes
server.get('/', (request) async {
final html = '''
<!DOCTYPE html>
<html>
<head>
<title>Khadem WebSocket Server</title>
</head>
<body>
<h1>Khadem WebSocket Server</h1>
<p>WebSocket endpoint: <code>ws://localhost:8080/ws</code></p>
<CodeBlock
:code="clientExampleCode"
language="javascript"
title="Basic WebSocket Client Example"
/>
</body>
</html>
''';
return Response.html(html);
});
// Start server
await server.listen(8080);
print('Server running on http://localhost:8080');
print('WebSocket endpoint: ws://localhost:8080/ws');
}
// Helper function for JWT verification
Future<Map<String, dynamic>?> verifyJWTToken(String token) async {
try {
// Implement your JWT verification logic here
// This is a placeholder
return {
'sub': 'user123',
'username': 'john_doe'
};
} catch (e) {
return null;
}
}
Room-based Messaging
Room Management
dart
// Room management using SocketServer and SocketManager
class ChatServer {
final SocketServer server;
final SocketManager manager;
ChatServer(this.server, this.manager) {
setupEventHandlers();
}
void setupEventHandlers() {
// Handle room join
server.on('join_room', (SocketClient client, Map<String, dynamic> data) async {
final roomName = data['room'] as String?;
if (roomName == null) return;
try {
await client.joinRoom(roomName);
// Notify others in the room
await manager.broadcastToRoom(roomName, {
'event': 'user_joined',
'user_id': client.get('user_id'),
'username': client.get('username'),
'room': roomName,
'timestamp': DateTime.now().toIso8601String()
}, exclude: client);
// Send room info to the client
final roomClients = manager.getClientsInRoom(roomName);
client.sendJson({
'event': 'room_joined',
'room': roomName,
'user_count': roomClients.length,
'users': roomClients.map((c) => {
'user_id': c.get('user_id'),
'username': c.get('username')
}).toList()
});
} catch (e) {
client.sendJson({
'event': 'error',
'message': 'Failed to join room: ${e.toString()}'
});
}
});
// Handle room leave
server.on('leave_room', (SocketClient client, Map<String, dynamic> data) async {
final roomName = data['room'] as String?;
if (roomName == null) return;
try {
await client.leaveRoom(roomName);
// Notify others in the room
await manager.broadcastToRoom(roomName, {
'event': 'user_left',
'user_id': client.get('user_id'),
'username': client.get('username'),
'room': roomName,
'timestamp': DateTime.now().toIso8601String()
});
} catch (e) {
client.sendJson({
'event': 'error',
'message': 'Failed to leave room: ${e.toString()}'
});
}
});
// Handle room messages
server.on('room_message', (SocketClient client, Map<String, dynamic> data) async {
final roomName = data['room'] as String?;
final message = data['message'] as String?;
if (roomName == null || message == null) return;
// Check if client is in the room
if (!client.isInRoom(roomName)) {
client.sendJson({
'event': 'error',
'message': 'You are not in this room'
});
return;
}
// Broadcast message to room
await manager.broadcastToRoom(roomName, {
'event': 'room_message',
'user_id': client.get('user_id'),
'username': client.get('username'),
'message': message,
'room': roomName,
'timestamp': DateTime.now().toIso8601String()
}, exclude: client);
});
}
}
// Usage
void main() async {
final server = SocketServer();
final manager = SocketManager();
final chatServer = ChatServer(server, manager);
// Add middleware
server.useAuth((client, request) async {
final token = request.query['token'];
if (token == null) return null;
// Verify token and return user ID
final payload = await verifyJWTToken(token);
return payload?['sub'];
});
// Start server
await server.listen(8080);
print('WebSocket server running on port 8080');
}
Rooms allow you to group clients and broadcast messages to specific groups.
Broadcasting
dart
// Broadcasting using SocketManager
class BroadcastingService {
final SocketManager manager;
BroadcastingService(this.manager);
// Broadcast to all connected clients
Future<void> broadcastToAll(Map<String, dynamic> message) async {
await manager.broadcast(message);
}
// Broadcast to specific room
Future<void> broadcastToRoom(String roomName, Map<String, dynamic> message, {
SocketClient? exclude
}) async {
await manager.broadcastToRoom(roomName, message, exclude: exclude);
}
// Broadcast to specific user
Future<void> broadcastToUser(String userId, Map<String, dynamic> message) async {
final client = manager.getClientByUserId(userId);
if (client != null) {
client.sendJson(message);
}
}
// Broadcast to multiple users
Future<void> broadcastToUsers(List<String> userIds, Map<String, dynamic> message) async {
for (final userId in userIds) {
await broadcastToUser(userId, message);
}
}
// Broadcast except specific users
Future<void> broadcastExcept(List<String> excludeUserIds, Map<String, dynamic> message) async {
final allClients = manager.getAllClients();
for (final client in allClients) {
final userId = client.get('user_id');
if (userId != null && !excludeUserIds.contains(userId)) {
client.sendJson(message);
}
}
}
// Get room statistics
Map<String, dynamic> getRoomStats(String roomName) {
final clients = manager.getClientsInRoom(roomName);
return {
'room': roomName,
'user_count': clients.length,
'users': clients.map((c) => c.get('user_id')).toList()
};
}
// Get server statistics
Map<String, dynamic> getServerStats() {
return {
'total_clients': manager.getTotalClients(),
'total_rooms': manager.getTotalRooms(),
'rooms': manager.getAllRooms().map((roomName) => getRoomStats(roomName)).toList()
};
}
}
// Usage example
void setupBroadcasting(SocketServer server, SocketManager manager) {
final broadcaster = BroadcastingService(manager);
// Handle broadcast requests
server.on('broadcast', (SocketClient client, Map<String, dynamic> data) async {
final target = data['target'];
final message = data['message'];
if (target == null || message == null) return;
try {
switch (target['type']) {
case 'all':
await broadcaster.broadcastToAll({
'event': 'broadcast',
'from': client.get('user_id'),
'message': message
});
break;
case 'room':
final roomName = target['room'];
if (roomName != null) {
await broadcaster.broadcastToRoom(roomName, {
'event': 'room_broadcast',
'from': client.get('user_id'),
'message': message
}, exclude: client);
}
break;
case 'user':
final userId = target['user_id'];
if (userId != null) {
await broadcaster.broadcastToUser(userId, {
'event': 'direct_message',
'from': client.get('user_id'),
'message': message
});
}
break;
}
} catch (e) {
client.sendJson({
'event': 'error',
'message': 'Broadcast failed: ${e.toString()}'
});
}
});
// Handle stats requests
server.on('get_stats', (SocketClient client, Map<String, dynamic> data) async {
final stats = broadcaster.getServerStats();
client.sendJson({
'event': 'stats',
'data': stats
});
});
}
Send messages to all clients in a room or to individual clients.
Event Subscription System
dart
// Event subscription system using SocketServer
class EventSubscriptionService {
final SocketServer server;
final Map<String, Set<SocketClient>> _eventSubscribers = {};
EventSubscriptionService(this.server) {
setupEventHandlers();
}
void setupEventHandlers() {
// Handle event subscription
server.on('subscribe', (SocketClient client, Map<String, dynamic> data) async {
final event = data['event'] as String?;
if (event == null) return;
_eventSubscribers.putIfAbsent(event, () => {}).add(client);
client.sendJson({
'event': 'subscribed',
'subscription_event': event,
'timestamp': DateTime.now().toIso8601String()
});
print('Client ${client.id} subscribed to event: $event');
});
// Handle event unsubscription
server.on('unsubscribe', (SocketClient client, Map<String, dynamic> data) async {
final event = data['event'] as String?;
if (event == null) return;
final subscribers = _eventSubscribers[event];
if (subscribers != null) {
subscribers.remove(client);
if (subscribers.isEmpty) {
_eventSubscribers.remove(event);
}
}
client.sendJson({
'event': 'unsubscribed',
'subscription_event': event,
'timestamp': DateTime.now().toIso8601String()
});
print('Client ${client.id} unsubscribed from event: $event');
});
// Handle event broadcasting
server.on('broadcast_event', (SocketClient client, Map<String, dynamic> data) async {
final event = data['event'] as String?;
final eventData = data['data'];
if (event == null) return;
await broadcastToEvent(event, {
'event': event,
'data': eventData,
'from_user': client.get('user_id'),
'timestamp': DateTime.now().toIso8601String()
}, exclude: client);
});
// Handle client disconnection
server.onDisconnect((SocketClient client) async {
// Remove client from all subscriptions
for (final subscribers in _eventSubscribers.values) {
subscribers.remove(client);
}
// Clean up empty subscription sets
_eventSubscribers.removeWhere((event, subscribers) => subscribers.isEmpty);
});
}
// Broadcast to all subscribers of an event
Future<void> broadcastToEvent(String event, Map<String, dynamic> message, {
SocketClient? exclude
}) async {
final subscribers = _eventSubscribers[event];
if (subscribers == null) return;
final disconnectedClients = <SocketClient>[];
for (final subscriber in subscribers) {
if (subscriber != exclude && subscriber.isConnected) {
try {
subscriber.sendJson(message);
} catch (e) {
print('Failed to send to subscriber: $e');
disconnectedClients.add(subscriber);
}
}
}
// Remove disconnected clients
for (final client in disconnectedClients) {
subscribers.remove(client);
}
if (subscribers.isEmpty) {
_eventSubscribers.remove(event);
}
}
// Get subscription statistics
Map<String, dynamic> getSubscriptionStats() {
return {
'total_events': _eventSubscribers.length,
'events': _eventSubscribers.map((event, subscribers) => MapEntry(event, {
'subscriber_count': subscribers.length,
'subscribers': subscribers.map((c) => c.get('user_id')).toList()
}))
};
}
// Check if client is subscribed to event
bool isSubscribed(SocketClient client, String event) {
final subscribers = _eventSubscribers[event];
return subscribers?.contains(client) ?? false;
}
// Get all events client is subscribed to
List<String> getClientSubscriptions(SocketClient client) {
final subscriptions = <String>[];
for (final entry in _eventSubscribers.entries) {
if (entry.value.contains(client)) {
subscriptions.add(entry.key);
}
}
return subscriptions;
}
}
// Usage example
void setupEventSubscription(SocketServer server) {
final eventService = EventSubscriptionService(server);
// Add some predefined events
server.on('user_online', (SocketClient client, Map<String, dynamic> data) async {
await eventService.broadcastToEvent('user_status', {
'user_id': client.get('user_id'),
'status': 'online',
'timestamp': DateTime.now().toIso8601String()
});
});
server.on('user_offline', (SocketClient client, Map<String, dynamic> data) async {
await eventService.broadcastToEvent('user_status', {
'user_id': client.get('user_id'),
'status': 'offline',
'timestamp': DateTime.now().toIso8601String()
});
});
// Handle stats requests
server.on('get_subscription_stats', (SocketClient client, Map<String, dynamic> data) async {
final stats = eventService.getSubscriptionStats();
client.sendJson({
'event': 'subscription_stats',
'data': stats
});
});
}
Event Subscription Features
- Subscribe clients to specific events for targeted messaging
- Broadcast events to all subscribed clients
- Check subscription status and subscriber counts
- Automatic cleanup when clients disconnect
- Real-time subscription management
Middleware Pipeline
Middleware Types
dart
// SocketMiddleware types from the actual implementation
abstract class SocketMiddleware {
// Handle connection
Future<bool> handleConnection(SocketClient client, Request request) async {
return true;
}
// Handle message
Future<bool> handleMessage(SocketClient client, String message) async {
return true;
}
// Handle room join
Future<bool> handleRoomJoin(SocketClient client, String roomName) async {
return true;
}
// Handle disconnect
Future<void> handleDisconnect(SocketClient client) async {}
}
// Connection middleware example
class ConnectionLimitMiddleware extends SocketMiddleware {
static final Map<String, int> _connectionsPerIP = {};
final int maxConnectionsPerIP = 10;
@override
Future<bool> handleConnection(SocketClient client, Request request) async {
final ip = request.ip;
final currentCount = _connectionsPerIP[ip] ?? 0;
if (currentCount >= maxConnectionsPerIP) {
client.sendJson({
'event': 'connection_rejected',
'reason': 'Too many connections from this IP'
});
return false;
}
_connectionsPerIP[ip] = currentCount + 1;
client.set('ip', ip);
return true;
}
@override
Future<void> handleDisconnect(SocketClient client) async {
final ip = client.get('ip');
if (ip != null && _connectionsPerIP.containsKey(ip)) {
_connectionsPerIP[ip] = _connectionsPerIP[ip]! - 1;
if (_connectionsPerIP[ip] == 0) {
_connectionsPerIP.remove(ip);
}
}
}
}
// Message validation middleware
class MessageValidationMiddleware extends SocketMiddleware {
@override
Future<bool> handleMessage(SocketClient client, String message) async {
try {
final data = jsonDecode(message);
// Validate required fields
if (!data.containsKey('event') || !data.containsKey('data')) {
client.sendJson({
'event': 'error',
'message': 'Invalid message format: missing event or data field'
});
return false;
}
// Validate event type
final event = data['event'];
if (event is! String || event.isEmpty) {
client.sendJson({
'event': 'error',
'message': 'Invalid event type'
});
return false;
}
return true;
} catch (e) {
client.sendJson({
'event': 'error',
'message': 'Invalid JSON format'
});
return false;
}
}
}
// Room permission middleware
class RoomPermissionMiddleware extends SocketMiddleware {
@override
Future<bool> handleRoomJoin(SocketClient client, String roomName) async {
final userId = client.get('user_id');
// Public rooms (no permission check)
if (roomName.startsWith('public:')) {
return true;
}
// Private rooms (user-specific)
if (roomName.startsWith('private:')) {
return roomName == 'private:$userId';
}
// Admin rooms
if (roomName.startsWith('admin:')) {
final isAdmin = client.get('is_admin') == true;
if (!isAdmin) {
client.sendJson({
'event': 'room_join_denied',
'room': roomName,
'reason': 'Admin access required'
});
return false;
}
}
return true;
}
}
// Logging middleware
class LoggingMiddleware extends SocketMiddleware {
@override
Future<bool> handleConnection(SocketClient client, Request request) async {
print('WebSocket connection from ${request.ip} at ${DateTime.now()}');
return true;
}
@override
Future<bool> handleMessage(SocketClient client, String message) async {
final userId = client.get('user_id') ?? 'anonymous';
print('Message from user $userId: ${message.substring(0, min(100, message.length))}');
return true;
}
@override
Future<void> handleDisconnect(SocketClient client) async {
final userId = client.get('user_id') ?? 'anonymous';
print('User $userId disconnected');
}
}
Authentication Middleware
dart
// Authentication middleware using server.useAuth()
class AuthMiddleware extends SocketMiddleware {
@override
Future<bool> handleConnection(SocketClient client, Request request) async {
// Check for authentication token in query parameters or headers
final token = request.query['token'] ?? request.headers['authorization'];
if (token == null || token.isEmpty) {
client.sendJson({
'event': 'auth_required',
'message': 'Authentication token required'
});
return false;
}
try {
// Verify JWT token (implement your JWT verification logic)
final payload = await verifyJWTToken(token);
if (payload == null) {
throw Exception('Invalid token');
}
// Store user information in client context
client.set('user_id', payload['sub']);
client.set('username', payload['username']);
client.set('is_admin', payload['role'] == 'admin');
client.set('authenticated', true);
// Send authentication success
client.sendJson({
'event': 'authenticated',
'user_id': payload['sub'],
'username': payload['username']
});
return true;
} catch (e) {
client.sendJson({
'event': 'auth_failed',
'message': 'Authentication failed: ${e.toString()}'
});
return false;
}
}
}
// Usage with SocketServer
void setupWebSocketServer() {
final server = SocketServer();
// Add authentication middleware
server.useAuth((SocketClient client, Request request) async {
final token = request.query['token'];
if (token == null) return null;
// Return user ID if authenticated, null otherwise
final payload = await verifyJWTToken(token);
return payload?['sub'];
});
// Add other middleware
server.use((SocketClient client, Request request) async {
// Connection middleware logic
return true;
});
// Setup event handlers
server.on('join_room', (SocketClient client, Map<String, dynamic> data) async {
final roomName = data['room'];
if (roomName != null) {
await client.joinRoom(roomName);
client.sendJson({
'event': 'room_joined',
'room': roomName
});
}
});
server.on('send_message', (SocketClient client, Map<String, dynamic> data) async {
final roomName = data['room'];
final message = data['message'];
if (roomName != null && message != null) {
await server.broadcastToRoom(roomName, {
'event': 'message',
'user_id': client.get('user_id'),
'username': client.get('username'),
'message': message,
'timestamp': DateTime.now().toIso8601String()
}, exclude: client);
}
});
}
// Helper function for JWT verification
Future<Map<String, dynamic>?> verifyJWTToken(String token) async {
try {
// Implement your JWT verification logic here
// This is a placeholder - use a proper JWT library
return {
'sub': 'user123',
'username': 'john_doe',
'role': 'user'
};
} catch (e) {
return null;
}
}
Rate Limiting Middleware
dart
// Rate limiting middleware for SocketClient
class RateLimitMiddleware extends SocketMiddleware {
static final Map<String, RateLimiter> _rateLimiters = {};
final Map<String, int> _limits = {
'message': 60, // 60 messages per minute
'join_room': 10, // 10 room joins per minute
'leave_room': 10, // 10 room leaves per minute
'ping': 120, // 120 pings per minute
};
@override
Future<bool> handleMessage(SocketClient client, String message) async {
try {
final data = jsonDecode(message);
final event = data['event'] as String?;
if (event == null) return true;
final userId = client.get('user_id') ?? client.id;
final limit = _limits[event] ?? 30; // Default 30 requests per minute
final key = '$userId:$event';
final limiter = _rateLimiters.putIfAbsent(
key,
() => RateLimiter(maxRequests: limit, window: Duration(minutes: 1))
);
final result = await limiter.check(userId);
if (!result.allowed) {
client.sendJson({
'event': 'rate_limited',
'message': 'Too many $event requests. Try again later.',
'retry_after_seconds': result.retryAfter?.inSeconds ?? 60
});
return false;
}
return true;
} catch (e) {
// If message parsing fails, allow it through
return true;
}
}
}
// Rate limiter implementation
class RateLimiter {
final int maxRequests;
final Duration window;
final List<DateTime> _requests = [];
RateLimiter({required this.maxRequests, required this.window});
Future<RateLimitResult> check(String identifier) async {
final now = DateTime.now();
// Remove old requests outside the window
_requests.removeWhere((time) => now.difference(time) > window);
if (_requests.length >= maxRequests) {
final oldestRequest = _requests.first;
final resetTime = oldestRequest.add(window);
final retryAfter = resetTime.difference(now);
return RateLimitResult(
allowed: false,
retryAfter: retryAfter
);
}
_requests.add(now);
return RateLimitResult(allowed: true, retryAfter: null);
}
}
class RateLimitResult {
final bool allowed;
final Duration? retryAfter;
RateLimitResult({required this.allowed, this.retryAfter});
}
// Usage with SocketServer
void setupRateLimitedServer() {
final server = SocketServer();
// Add rate limiting middleware
server.use((SocketClient client, Request request) async {
// Rate limiting is handled per message in the middleware above
return true;
});
// Add rate limiting specifically for connections
final connectionLimiter = RateLimiter(maxRequests: 100, window: Duration(minutes: 1));
server.use((SocketClient client, Request request) async {
final ip = request.ip;
final result = await connectionLimiter.check(ip);
if (!result.allowed) {
client.sendJson({
'event': 'connection_rate_limited',
'message': 'Too many connections from this IP'
});
return false;
}
return true;
});
}
Client Management
dart
// Client management using SocketClient and SocketManager
class ClientManager {
final SocketManager manager;
final Map<String, SocketClient> _clients = {};
final Map<String, Map<String, dynamic>> _clientMetadata = {};
ClientManager(this.manager);
// Add client
void addClient(SocketClient client) {
final userId = client.get('user_id') ?? client.id;
_clients[userId] = client;
// Store metadata
_clientMetadata[userId] = {
'connected_at': DateTime.now().toIso8601String(),
'ip': client.get('ip'),
'user_agent': client.get('user_agent'),
'rooms': <String>[],
'last_activity': DateTime.now().toIso8601String()
};
print('Client added: $userId');
}
// Remove client
void removeClient(String userId) {
final client = _clients.remove(userId);
if (client != null) {
// Leave all rooms
final metadata = _clientMetadata[userId];
if (metadata != null) {
final rooms = List<String>.from(metadata['rooms'] ?? []);
for (final room in rooms) {
client.leaveRoom(room);
}
}
_clientMetadata.remove(userId);
print('Client removed: $userId');
}
}
// Get client by user ID
SocketClient? getClient(String userId) {
return _clients[userId];
}
// Get all clients
List<SocketClient> getAllClients() {
return _clients.values.toList();
}
// Update client metadata
void updateClientMetadata(String userId, String key, dynamic value) {
if (_clientMetadata.containsKey(userId)) {
_clientMetadata[userId]![key] = value;
_clientMetadata[userId]!['last_activity'] = DateTime.now().toIso8601String();
}
}
// Get client metadata
Map<String, dynamic>? getClientMetadata(String userId) {
return _clientMetadata[userId];
}
// Broadcast to all clients
Future<void> broadcastToAll(Map<String, dynamic> message) async {
await manager.broadcast(message);
}
// Send message to specific client
Future<void> sendToClient(String userId, Map<String, dynamic> message) async {
final client = _clients[userId];
if (client != null && client.isConnected) {
client.sendJson(message);
}
}
// Get client statistics
Map<String, dynamic> getStats() {
final now = DateTime.now();
final activeClients = _clients.values.where((c) => c.isConnected).length;
return {
'total_clients': _clients.length,
'active_clients': activeClients,
'inactive_clients': _clients.length - activeClients,
'clients': _clientMetadata.entries.map((entry) => {
'user_id': entry.key,
'connected_at': entry.value['connected_at'],
'last_activity': entry.value['last_activity'],
'rooms': entry.value['rooms'],
'is_active': _clients[entry.key]?.isConnected ?? false
}).toList()
};
}
// Clean up inactive clients
void cleanupInactiveClients() {
final inactiveClients = <String>[];
for (final entry in _clients.entries) {
if (!entry.value.isConnected) {
inactiveClients.add(entry.key);
}
}
for (final userId in inactiveClients) {
removeClient(userId);
}
}
}
// SocketClient extension methods
extension SocketClientExtensions on SocketClient {
// Check if client is in a room
bool isInRoom(String roomName) {
// This would be implemented in the actual SocketClient class
return get('rooms')?.contains(roomName) ?? false;
}
// Get client rooms
List<String> getRooms() {
return List<String>.from(get('rooms') ?? []);
}
// Join room with metadata update
Future<void> joinRoom(String roomName) async {
// Implementation would call the actual joinRoom method
final rooms = getRooms();
if (!rooms.contains(roomName)) {
rooms.add(roomName);
set('rooms', rooms);
}
}
// Leave room with metadata update
Future<void> leaveRoom(String roomName) async {
// Implementation would call the actual leaveRoom method
final rooms = getRooms();
rooms.remove(roomName);
set('rooms', rooms);
}
// Send JSON message
void sendJson(Map<String, dynamic> data) {
try {
final message = jsonEncode(data);
send(message);
} catch (e) {
print('Failed to send JSON message: $e');
}
}
}
// Usage example
void setupClientManagement(SocketServer server, SocketManager manager) {
final clientManager = ClientManager(manager);
// Handle client connections
server.onConnect((SocketClient client) async {
clientManager.addClient(client);
// Send welcome message
client.sendJson({
'event': 'welcome',
'user_id': client.get('user_id'),
'timestamp': DateTime.now().toIso8601String()
});
});
// Handle client disconnections
server.onDisconnect((SocketClient client) async {
final userId = client.get('user_id') ?? client.id;
clientManager.removeClient(userId);
});
// Handle client info requests
server.on('get_client_info', (SocketClient client, Map<String, dynamic> data) async {
final userId = data['user_id'] as String?;
if (userId != null) {
final metadata = clientManager.getClientMetadata(userId);
if (metadata != null) {
client.sendJson({
'event': 'client_info',
'user_id': userId,
'data': metadata
});
}
}
});
// Handle stats requests
server.on('get_client_stats', (SocketClient client, Map<String, dynamic> data) async {
final stats = clientManager.getStats();
client.sendJson({
'event': 'client_stats',
'data': stats
});
});
}
Client Context Features
Context Storage
set(key, value)
- Store data in client contextget(key)
- Retrieve data from contextisAuthorized
- Check authorization statusisAuthenticated
- Check authentication status
Header Access
authToken
- Get authorization tokenuserAgent
- Get user agent stringgetHeader(name)
- Get specific headergetHeaderValues(name)
- Get all header values
Error Handling
dart
class ErrorHandler {
static void handle(WebSocket socket, dynamic error, StackTrace? stackTrace) {
print('WebSocket error: $error');
if (stackTrace != null) {
print('Stack trace: $stackTrace');
}
// Send error to client
try {
socket.send(jsonEncode({
'type': 'error',
'message': 'An error occurred',
'timestamp': DateTime.now().toIso8601String()
}));
} catch (e) {
print('Failed to send error to client: $e');
}
}
static void handleConnectionError(WebSocket socket, dynamic error) {
print('Connection error: $error');
try {
socket.send(jsonEncode({
'type': 'connection_error',
'message': 'Connection failed',
'error': error.toString()
}));
} catch (e) {
// Socket might be closed, ignore
}
}
static void handleMessageError(WebSocket socket, String message, dynamic error) {
print('Message processing error: $error');
try {
socket.send(jsonEncode({
'type': 'message_error',
'message': 'Failed to process message',
'original_message': message,
'error': error.toString()
}));
} catch (e) {
print('Failed to send message error: $e');
}
}
}
Exception Handler Features
- Automatic error logging with stack traces
- Client-specific error responses
- Development vs production error details
- Middleware error handling
- Connection and disconnection error handling
Advanced Usage
Real-time Chat Application
dart
// Real-time chat application
class ChatRoom {
static final Map<String, Set<WebSocket>> _rooms = {};
static final Map<String, Map<String, dynamic>> _roomUsers = {};
static final Map<String, List<Map<String, dynamic>>> _messageHistory = {};
// Join chat room
static Future joinRoom(String roomName, WebSocket socket, String userId, String username) async {
_rooms.putIfAbsent(roomName, () => {}).add(socket);
_roomUsers.putIfAbsent(roomName, () => {})[userId] = {
'username': username,
'joined_at': DateTime.now().toIso8601String(),
'socket': socket
};
// Send recent message history
final history = _messageHistory[roomName] ?? [];
if (history.isNotEmpty) {
socket.send(jsonEncode({
'type': 'message_history',
'room': roomName,
'messages': history.take(50).toList() // Last 50 messages
}));
}
// Notify room members
await broadcastToRoom(roomName, {
'type': 'user_joined',
'user_id': userId,
'username': username,
'room': roomName,
'timestamp': DateTime.now().toIso8601String()
}, exclude: socket);
// Send room info
socket.send(jsonEncode({
'type': 'room_joined',
'room': roomName,
'user_count': _rooms[roomName]?.length ?? 0,
'users': _roomUsers[roomName]?.values.map((u) => {
'user_id': u.keys.firstWhere((k) => k != 'socket'),
'username': u['username'],
'joined_at': u['joined_at']
}).toList()
}));
}
// Leave chat room
static Future leaveRoom(String roomName, WebSocket socket, String userId) async {
final room = _rooms[roomName];
final roomUsers = _roomUsers[roomName];
if (room != null && roomUsers != null) {
room.remove(socket);
final userInfo = roomUsers[userId];
if (userInfo != null) {
final username = userInfo['username'];
// Notify remaining members
await broadcastToRoom(roomName, {
'type': 'user_left',
'user_id': userId,
'username': username,
'room': roomName,
'timestamp': DateTime.now().toIso8601String()
});
roomUsers.remove(userId);
}
// Clean up empty rooms
if (room.isEmpty) {
_rooms.remove(roomName);
_roomUsers.remove(roomName);
_messageHistory.remove(roomName);
}
}
}
// Send message to room
static Future sendMessage(String roomName, String userId, String username, String message) async {
final messageData = {
'id': generateMessageId(),
'user_id': userId,
'username': username,
'message': message,
'room': roomName,
'timestamp': DateTime.now().toIso8601String()
};
// Store in history
_messageHistory.putIfAbsent(roomName, () => []).add(messageData);
// Keep only last 1000 messages
if (_messageHistory[roomName]!.length > 1000) {
_messageHistory[roomName] = _messageHistory[roomName]!.sublist(500);
}
// Broadcast to room
await broadcastToRoom(roomName, {
'type': 'message',
...messageData
});
// Save to database asynchronously
unawaited(saveMessageToDatabase(messageData));
}
// Handle typing indicators
static Future handleTyping(String roomName, String userId, String username, bool isTyping) async {
await broadcastToRoom(roomName, {
'type': 'typing',
'user_id': userId,
'username': username,
'is_typing': isTyping,
'room': roomName,
'timestamp': DateTime.now().toIso8601String()
}, excludeUserId: userId);
}
// Private messaging
static Future sendPrivateMessage(String fromUserId, String fromUsername,
String toUserId, String message) async {
final messageData = {
'id': generateMessageId(),
'from_user_id': fromUserId,
'from_username': fromUsername,
'to_user_id': toUserId,
'message': message,
'timestamp': DateTime.now().toIso8601String()
};
// Find recipient's socket
for (final room in _rooms.values) {
for (final socket in room) {
final socketUserId = getUserIdForSocket(socket);
if (socketUserId == toUserId) {
socket.send(jsonEncode({
'type': 'private_message',
...messageData
}));
break;
}
}
}
// Save to database
unawaited(savePrivateMessageToDatabase(messageData));
}
// Broadcast to room
static Future broadcastToRoom(String roomName, dynamic message, {
WebSocket? exclude,
String? excludeUserId
}) async {
final room = _rooms[roomName];
if (room != null) {
final messageStr = message is String ? message : jsonEncode(message);
for (final socket in room) {
if (socket != exclude) {
if (excludeUserId != null) {
final socketUserId = getUserIdForSocket(socket);
if (socketUserId == excludeUserId) continue;
}
try {
socket.send(messageStr);
} catch (e) {
print('Failed to broadcast to room $roomName: $e');
room.remove(socket);
}
}
}
}
}
// Get room information
static Map<String, dynamic> getRoomInfo(String roomName) {
final room = _rooms[roomName];
final users = _roomUsers[roomName];
return {
'name': roomName,
'user_count': room?.length ?? 0,
'users': users?.entries.map((entry) => {
'user_id': entry.key,
'username': entry.value['username'],
'joined_at': entry.value['joined_at']
}).toList() ?? [],
'message_count': _messageHistory[roomName]?.length ?? 0
};
}
// Helper functions
static String generateMessageId() {
return 'msg_${DateTime.now().millisecondsSinceEpoch}_${Random().nextInt(1000)}';
}
static String? getUserIdForSocket(WebSocket socket) {
for (final roomUsers in _roomUsers.values) {
for (final entry in roomUsers.entries) {
if (entry.value['socket'] == socket) {
return entry.key;
}
}
}
return null;
}
static Future saveMessageToDatabase(Map<String, dynamic> message) async {
try {
await Database.table('chat_messages').insert({
'id': message['id'],
'user_id': message['user_id'],
'room': message['room'],
'message': message['message'],
'created_at': message['timestamp']
});
} catch (e) {
print('Failed to save message to database: $e');
}
}
static Future savePrivateMessageToDatabase(Map<String, dynamic> message) async {
try {
await Database.table('private_messages').insert({
'id': message['id'],
'from_user_id': message['from_user_id'],
'to_user_id': message['to_user_id'],
'message': message['message'],
'created_at': message['timestamp']
});
} catch (e) {
print('Failed to save private message to database: $e');
}
}
}
// Chat message handler
class ChatMessageHandler {
static Future handleMessage(WebSocket socket, String message, String userId, String username) async {
try {
final data = jsonDecode(message);
switch (data['type']) {
case 'join_room':
final roomName = data['room'];
if (roomName != null) {
await ChatRoom.joinRoom(roomName, socket, userId, username);
}
break;
case 'leave_room':
final roomName = data['room'];
if (roomName != null) {
await ChatRoom.leaveRoom(roomName, socket, userId);
}
break;
case 'send_message':
final roomName = data['room'];
final messageText = data['message'];
if (roomName != null && messageText != null) {
await ChatRoom.sendMessage(roomName, userId, username, messageText);
}
break;
case 'typing':
final roomName = data['room'];
final isTyping = data['is_typing'] ?? false;
if (roomName != null) {
await ChatRoom.handleTyping(roomName, userId, username, isTyping);
}
break;
case 'private_message':
final toUserId = data['to_user'];
final messageText = data['message'];
if (toUserId != null && messageText != null) {
await ChatRoom.sendPrivateMessage(userId, username, toUserId, messageText);
}
break;
case 'get_room_info':
final roomName = data['room'];
if (roomName != null) {
socket.send(jsonEncode({
'type': 'room_info',
'room': roomName,
'info': ChatRoom.getRoomInfo(roomName)
}));
}
break;
default:
socket.send(jsonEncode({
'type': 'error',
'message': 'Unknown message type: ${data['type']}'
}));
}
} catch (e) {
socket.send(jsonEncode({
'type': 'error',
'message': 'Invalid message format'
}));
}
}
}
Live Notifications
dart
class NotificationManager {
static final Map<String, Set<WebSocket>> _userSockets = {};
// Register user socket
static void registerUser(String userId, WebSocket socket) {
_userSockets.putIfAbsent(userId, () => {}).add(socket);
}
// Unregister user socket
static void unregisterUser(String userId, WebSocket socket) {
final sockets = _userSockets[userId];
if (sockets != null) {
sockets.remove(socket);
if (sockets.isEmpty) {
_userSockets.remove(userId);
}
}
}
// Send notification to user
static void sendNotification(String userId, String title, String message, {
String type = 'info',
Map<String, dynamic>? data
}) {
final sockets = _userSockets[userId];
if (sockets != null) {
final notification = {
'type': 'notification',
'title': title,
'message': message,
'notification_type': type,
'data': data,
'timestamp': DateTime.now().toIso8601String()
};
for (final socket in sockets) {
socket.send(jsonEncode(notification));
}
}
}
// Broadcast notification to all users
static void broadcastNotification(String title, String message, {
String type = 'info',
Map<String, dynamic>? data
}) {
final notification = {
'type': 'notification',
'title': title,
'message': message,
'notification_type': type,
'data': data,
'timestamp': DateTime.now().toIso8601String()
};
for (final sockets in _userSockets.values) {
for (final socket in sockets) {
socket.send(jsonEncode(notification));
}
}
}
}
Game Server
dart
class GameServer {
static final Map<String, GameRoom> _rooms = {};
static final Map<String, Player> _players = {};
// Create game room
static String createRoom(String gameType, String hostId) {
final roomId = generateRoomId();
final room = GameRoom(roomId, gameType, hostId);
_rooms[roomId] = room;
return roomId;
}
// Join game room
static bool joinRoom(String roomId, String playerId, WebSocket socket) {
final room = _rooms[roomId];
if (room == null) return false;
final player = Player(playerId, socket);
_players[playerId] = player;
room.addPlayer(player);
return true;
}
// Handle game message
static void handleGameMessage(String roomId, String playerId, Map<String, dynamic> message) {
final room = _rooms[roomId];
final player = _players[playerId];
if (room != null && player != null) {
room.handleMessage(player, message);
}
}
// Start game
static void startGame(String roomId) {
final room = _rooms[roomId];
if (room != null) {
room.startGame();
}
}
}
class GameRoom {
final String id;
final String gameType;
final String hostId;
final List<Player> players = [];
bool gameStarted = false;
GameRoom(this.id, this.gameType, this.hostId);
void addPlayer(Player player) {
players.add(player);
broadcast({
'type': 'player_joined',
'player_id': player.id,
'players_count': players.length
});
}
void handleMessage(Player sender, Map<String, dynamic> message) {
// Process game-specific messages
switch (message['action']) {
case 'move':
handleMove(sender, message);
break;
case 'attack':
handleAttack(sender, message);
break;
case 'chat':
handleChat(sender, message);
break;
}
}
void handleMove(Player player, Map<String, dynamic> data) {
broadcast({
'type': 'player_moved',
'player_id': player.id,
'position': data['position']
});
}
void handleAttack(Player attacker, Map<String, dynamic> data) {
broadcast({
'type': 'player_attacked',
'attacker_id': attacker.id,
'target_id': data['target_id'],
'damage': data['damage']
});
}
void handleChat(Player sender, Map<String, dynamic> data) {
broadcast({
'type': 'chat_message',
'player_id': sender.id,
'message': data['message']
});
}
void broadcast(Map<String, dynamic> message) {
final jsonMessage = jsonEncode(message);
for (final player in players) {
player.socket.send(jsonMessage);
}
}
void startGame() {
gameStarted = true;
broadcast({
'type': 'game_started',
'players': players.map((p) => p.id).toList()
});
}
}
class Player {
final String id;
final WebSocket socket;
Player(this.id, this.socket);
void send(Map<String, dynamic> message) {
socket.send(jsonEncode(message));
}
}
String generateRoomId() {
return 'room_${DateTime.now().millisecondsSinceEpoch}';
}
Best Practices
✅ Do's
- Use rooms for organizing clients by context (chat rooms, game lobbies, etc.)
- Implement proper authentication and authorization middleware
- Use event subscriptions for targeted messaging
- Handle errors gracefully and provide meaningful error messages
- Use middleware for cross-cutting concerns like logging and rate limiting
- Implement proper cleanup when clients disconnect
- Use client context to store user-specific data
- Test your WebSocket handlers thoroughly
- Use meaningful event names and consistent message formats
- Monitor connection counts and message throughput
❌ Don'ts
- Don't send sensitive data without proper authentication
- Don't block the event loop with long-running operations
- Don't rely on client-side validation alone
- Don't forget to handle client disconnections
- Don't use WebSocket for file uploads (use HTTP instead)
- Don't broadcast to all clients when you only need specific rooms
- Don't ignore middleware errors - they can break the connection
- Don't store large amounts of data in client context
- Don't use synchronous operations in async handlers
- Don't forget to implement rate limiting for production