Queue System
Simple and efficient background job processing with multiple drivers and monitoring capabilities.
Background JobsMultiple DriversMonitoringDelayed JobsRetry LogicFile Persistence
Quick Start
Basic Job Processing
dart
// Initialize queue manager
final config = Config(); // Your config implementation
final queueManager = QueueManager(config);
await queueManager.init();
// Create and dispatch a simple job
class WelcomeEmailJob extends QueueJob {
final String userEmail;
final String userName;
WelcomeEmailJob(this.userEmail, this.userName);
@override
Future<void> handle() async {
print('Sending welcome email to $userName at $userEmail');
// Send email logic here
await Future.delayed(Duration(seconds: 1)); // Simulate work
print('✅ Welcome email sent!');
}
@override
String get displayName => 'Send Welcome Email';
@override
int get maxRetries => 3;
}
// Dispatch the job
final job = WelcomeEmailJob('user@example.com', 'John Doe');
await queueManager.dispatch(job);
// Start a worker to process jobs
await queueManager.startWorker(
delay: Duration(seconds: 2),
onJobComplete: (job, result) => print('Job completed: ${job.displayName}'),
onJobError: (job, error, stack) => print('Job failed: ${job.displayName} - $error'),
);
💡 Note: Jobs are processed asynchronously without complex serialization
⚡ Tip: Use delayed jobs for scheduled tasks and background processing
Creating Jobs
dart
// Simple job class
class ProcessPaymentJob extends QueueJob {
final String orderId;
final double amount;
ProcessPaymentJob(this.orderId, this.amount);
@override
Future<void> handle() async {
print('Processing payment for order $orderId: $$${amount}');
// Simulate payment processing
await Future.delayed(Duration(seconds: 2));
if (Random().nextBool()) {
throw Exception('Payment gateway timeout');
}
print('✅ Payment processed successfully');
}
@override
String get displayName => 'Process Payment';
@override
int get maxRetries => 5;
}
// Job with custom serialization
class SendNotificationJob extends QueueJob {
final int userId;
final String message;
final NotificationType type;
SendNotificationJob(this.userId, this.message, this.type);
@override
Future<void> handle() async {
print('Sending $type notification to user $userId: $message');
// Send notification logic
await Future.delayed(Duration(milliseconds: 500));
print('✅ Notification sent');
}
@override
String get displayName => 'Send ${type.name} Notification';
@override
int get maxRetries => 3;
// Custom serialization for complex objects
@override
Map<String, dynamic> toJson() => {
'userId': userId,
'message': message,
'type': type.name,
};
static SendNotificationJob fromJson(Map<String, dynamic> json) {
return SendNotificationJob(
json['userId'],
json['message'],
NotificationType.values.firstWhere((e) => e.name == json['type']),
);
}
}
enum NotificationType { email, sms, push }
Job Interface
- •
handle()
- Main job execution method - •
displayName
- Human-readable job name - •
maxRetries
- Maximum retry attempts - •
toJson()
- Optional serialization for complex jobs
Queue Drivers
Memory Driver
dart
// Memory driver - fast but loses jobs on restart
final config = {
'queue': {
'driver': 'memory',
},
};
// Usage
final queueManager = QueueManager(config);
await queueManager.init();
// Jobs are stored in memory only
await queueManager.dispatch(WelcomeEmailJob('user@example.com', 'John'));
// ⚠️ Jobs will be lost if application restarts
// ✅ Perfect for development and testing
// ✅ Fastest performance
File Driver
dart
// File driver - persists jobs to disk
final config = {
'queue': {
'driver': 'file',
'file': {
'path': 'storage/queue/jobs.json',
},
},
};
// Usage
final queueManager = QueueManager(config);
await queueManager.init();
// Jobs are saved to file and survive restarts
await queueManager.dispatch(ImportantJob());
// File structure:
// storage/queue/jobs.json
// {
// "jobs": [
// {
// "id": "123456789",
// "type": "ImportantJob",
// "payload": {...},
// "scheduledAt": "2024-01-15T10:30:00.000Z",
// "attempts": 0,
// "maxRetries": 3
// }
// ]
// }
Sync Driver
dart
// Sync driver - processes jobs immediately
final config = {
'queue': {
'driver': 'sync',
},
};
// Usage
final queueManager = QueueManager(config);
await queueManager.init();
// Jobs execute immediately (no queuing)
await queueManager.dispatch(QuickJob());
// Output: Job executed immediately!
// ✅ No background processing needed
// ✅ Perfect for simple synchronous operations
// ❌ No actual queuing - jobs run in main thread
Driver Comparison
Driver | Persistence | Performance | Use Case |
---|---|---|---|
Memory | ❌ None | ⚡ Fastest | Development/Testing |
File | ✅ Disk | 🟡 Medium | Simple persistence |
Sync | ❌ None | ⚡ Immediate | Synchronous processing |
Redis | ✅ Redis | 🟡 Fast | Production clusters |
Configuration
dart
// Basic configuration
final config = {
'queue': {
'driver': 'memory', // memory, file, sync, redis
},
};
// File driver configuration
final fileConfig = {
'queue': {
'driver': 'file',
'file': {
'path': 'storage/queue/jobs.json',
'max_file_size': 5242880, // 5MB
'max_backups': 5,
},
},
};
// Redis driver configuration
final redisConfig = {
'queue': {
'driver': 'redis',
'redis': {
'connection': 'default',
'queue': 'default',
'retry_after': 90,
},
},
};
// Worker configuration
final workerConfig = {
'queue': {
'worker': {
'delay': 1, // seconds between jobs
'max_jobs': 1000,
'timeout': 3600, // 1 hour
},
},
};
// Initialize with configuration
final queueManager = QueueManager(config);
await queueManager.init();
// Custom driver registration
QueueFactory.registerDriver('custom', CustomQueueDriver());
Configuration Options
- •
driver
- Queue driver (memory, file, sync, redis) - •
file.path
- File path for file driver - •
redis.connection
- Redis connection for redis driver - •
worker.delay
- Delay between job processing - •
worker.max_jobs
- Maximum jobs per worker
Delayed Jobs
dart
// Immediate job execution
await queueManager.dispatch(UrgentJob());
// Delayed job execution
await queueManager.dispatch(
ScheduledJob(),
delay: Duration(minutes: 5),
);
// Various delay examples
final delays = [
Duration(seconds: 30), // 30 seconds
Duration(minutes: 5), // 5 minutes
Duration(hours: 1), // 1 hour
Duration(days: 1), // 1 day
Duration(hours: 24), // 24 hours
];
// Schedule newsletter
await queueManager.dispatch(
SendNewsletterJob(),
delay: Duration(hours: 9), // Send at 9 AM tomorrow
);
// Retry with backoff
class RetryWithBackoffJob extends QueueJob {
final int attempt;
RetryWithBackoffJob([this.attempt = 0]);
@override
Future<void> handle() async {
try {
await riskyOperation();
print('✅ Operation succeeded on attempt ${attempt + 1}');
} catch (e) {
if (attempt < maxRetries) {
// Exponential backoff: 30s, 1m, 2m, 4m, 8m...
final delay = Duration(seconds: 30 * (1 << attempt));
await queueManager.dispatch(
RetryWithBackoffJob(attempt + 1),
delay: delay,
);
print('⏰ Retrying in ${delay.inSeconds} seconds...');
} else {
print('❌ Operation failed permanently after ${attempt + 1} attempts');
}
}
}
@override
String get displayName => 'Retry Operation (Attempt ${attempt + 1})';
@override
int get maxRetries => 5;
}
Delay Examples
- •
Duration(seconds: 30)
- 30 seconds delay - •
Duration(minutes: 5)
- 5 minutes delay - •
Duration(hours: 1)
- 1 hour delay - •
Duration(days: 1)
- 1 day delay - •
null
- Immediate processing
Queue Workers
dart
// Basic worker
await queueManager.startWorker();
// Worker with custom configuration
await queueManager.startWorker(
maxJobs: 100, // Process max 100 jobs
delay: Duration(seconds: 5), // 5 second delay between jobs
timeout: Duration(hours: 1), // Stop after 1 hour
runInBackground: true, // Run in background
);
// Worker with event callbacks
await queueManager.startWorker(
onJobStart: (job) {
print('🚀 Starting job: ${job.displayName}');
},
onJobComplete: (job, result) {
print('✅ Completed job: ${job.displayName}');
},
onJobError: (job, error, stack) {
print('❌ Failed job: ${job.displayName} - $error');
// Send alert, log to external service, etc.
},
onError: (error, stack) {
print('💥 Worker error: $error');
// Handle worker-level errors
},
);
// Long-running worker for production
await queueManager.startWorker(
delay: Duration(milliseconds: 500), // Fast processing
maxJobs: null, // Run indefinitely
timeout: null, // No timeout
runInBackground: true,
);
// Development worker with verbose logging
await queueManager.startWorker(
delay: Duration(seconds: 2),
onJobStart: (job) => print('[DEV] Processing: ${job.displayName}'),
onJobComplete: (job, result) => print('[DEV] Completed: ${job.displayName}'),
onJobError: (job, error, stack) => print('[DEV] Error in ${job.displayName}: $error'),
);
Worker Configuration
- •
maxJobs
- Maximum jobs to process before stopping - •
delay
- Delay between processing jobs - •
timeout
- Maximum time to run - •
runInBackground
- Run worker in background - •
onError
- Error callback function - •
onJobStart/Complete/Error
- Job lifecycle callbacks
Batch Processing
dart
// Process multiple jobs at once
final jobs = [
SendEmailJob('user1@example.com', 'Welcome User 1'),
SendEmailJob('user2@example.com', 'Welcome User 2'),
SendEmailJob('user3@example.com', 'Welcome User 3'),
ProcessPaymentJob('order_123', 99.99),
UpdateInventoryJob('product_456', -5),
];
// Dispatch all at once
await queueManager.dispatchBatch(jobs);
// Dispatch with individual delays
await queueManager.dispatchBatch(jobs, delay: Duration(seconds: 10));
// Batch processing job
class BulkEmailJob extends QueueJob {
final List<String> emails;
final String subject;
final String template;
BulkEmailJob(this.emails, this.subject, this.template);
@override
Future<void> handle() async {
print('Sending ${emails.length} emails...');
for (final email in emails) {
try {
final personalizedContent = template.replaceAll('{{email}}', email);
await sendEmail(email, subject, personalizedContent);
print('✅ Sent to $email');
} catch (e) {
print('❌ Failed to send to $email: $e');
// Continue with other emails
}
}
print('✅ Bulk email job completed');
}
@override
String get displayName => 'Send Bulk Email (${emails.length} recipients)';
@override
int get maxRetries => 2;
}
// Usage
await queueManager.dispatch(BulkEmailJob(
['user1@example.com', 'user2@example.com', 'user3@example.com'],
'Monthly Newsletter',
'Hello {{email}}, here is your newsletter...',
));
Batch Benefits
- • Efficient bulk operations
- • Reduced overhead for multiple jobs
- • Atomic batch processing
- • Better resource utilization
Monitoring and Metrics
dart
// Get current metrics
final metrics = queueManager.getMetrics();
print('Queue Metrics:');
print(' Total Queued: ${metrics['total_queued']}');
print(' Total Completed: ${metrics['total_completed']}');
print(' Total Failed: ${metrics['total_failed']}');
print(' Currently Processing: ${metrics['currently_processing']}');
print(' Success Rate: ${(metrics['success_rate'] * 100).toStringAsFixed(1)}%');
print(' Average Processing Time: ${metrics['average_processing_time_ms']}ms');
// Monitor specific job types
final completedByType = metrics['completed_by_type'] as Map<String, int>;
print('Jobs by Type:');
completedByType.forEach((type, count) {
print(' $type: $count');
});
// Reset metrics (useful for testing)
queueManager.resetMetrics();
// Custom monitoring
class QueueMonitor {
final QueueManager _queueManager;
QueueMonitor(this._queueManager);
void startMonitoring() {
Timer.periodic(Duration(minutes: 5), (timer) {
final metrics = _queueManager.getMetrics();
// Check for issues
if (metrics['failure_rate'] > 0.1) { // > 10% failure rate
print('⚠️ High failure rate detected: ${(metrics['failure_rate'] * 100).toStringAsFixed(1)}%');
// Send alert
}
if (metrics['currently_processing'] > 10) { // Too many jobs processing
print('⚠️ High concurrent jobs: ${metrics['currently_processing']}');
// Scale up workers
}
// Log metrics
print('📊 Queue Health Check:');
print(' Success Rate: ${(metrics['success_rate'] * 100).toStringAsFixed(1)}%');
print(' Processing: ${metrics['currently_processing']} jobs');
print(' Queue Size: ${metrics['total_queued']} jobs');
});
}
}
Available Metrics
- •
total_queued
- Total jobs queued - •
total_completed
- Jobs successfully completed - •
total_failed
- Jobs that failed - •
currently_processing
- Jobs currently being processed - •
success_rate
- Percentage of successful jobs - •
average_processing_time
- Average job processing time
Error Handling and Retries
dart
// Job with error handling
class ProcessOrderJob extends QueueJob {
final String orderId;
ProcessOrderJob(this.orderId);
@override
Future<void> handle() async {
try {
// Step 1: Validate order
final order = await Order.find(orderId);
if (order == null) {
throw Exception('Order not found: $orderId');
}
// Step 2: Process payment
await PaymentService.charge(order);
// Step 3: Update inventory
for (final item in order.items) {
await Inventory.decrement(item.productId, item.quantity);
}
// Step 4: Send confirmation
await EmailService.sendOrderConfirmation(order);
print('✅ Order $orderId processed successfully');
} catch (e) {
print('❌ Failed to process order $orderId: $e');
// Log detailed error information
await ErrorLogger.log('order_processing_failed', {
'orderId': orderId,
'error': e.toString(),
'timestamp': DateTime.now().toIso8601String(),
'attempt': attempt,
});
// Don't rethrow - let queue handle retry logic
// The job will be retried based on maxRetries
}
}
@override
String get displayName => 'Process Order $orderId';
@override
int get maxRetries => 3;
}
// Worker with error handling
await queueManager.startWorker(
onJobError: (job, error, stack) async {
// Log to external service
await ExternalLogger.error('Queue job failed', {
'job': job.displayName,
'error': error.toString(),
'stack': stack.toString(),
'timestamp': DateTime.now().toIso8601String(),
});
// Send alert for critical jobs
if (job.displayName.contains('Payment') || job.displayName.contains('Order')) {
await AlertService.send('Critical job failed: ${job.displayName}');
}
},
onError: (error, stack) async {
// Worker-level error
await ExternalLogger.critical('Queue worker error', {
'error': error.toString(),
'stack': stack.toString(),
});
// Restart worker or send alert
await AlertService.send('Queue worker crashed - manual intervention required');
},
);
// Graceful error recovery
class ResilientJob extends QueueJob {
@override
Future<void> handle() async {
const maxRetries = 3;
var attempt = 0;
while (attempt < maxRetries) {
try {
await performRiskyOperation();
print('✅ Operation succeeded');
return;
} catch (e) {
attempt++;
print('⚠️ Attempt $attempt failed: $e');
if (attempt < maxRetries) {
// Wait before retry
await Future.delayed(Duration(seconds: attempt * 5));
}
}
}
throw Exception('Operation failed after $maxRetries attempts');
}
@override
String get displayName => 'Resilient Operation';
@override
int get maxRetries => 0; // Handle retries internally
}
Retry Behavior
- • Jobs retry automatically on failure
- • Configurable maximum retry attempts
- • Exponential backoff for retries
- • Failed jobs are logged but don't crash workers
- • File driver persists retry state across restarts
Real-World Examples
Email Processing
dart
// Email processing job
class SendWelcomeEmailJob extends QueueJob {
final User user;
SendWelcomeEmailJob(this.user);
@override
Future<void> handle() async {
try {
// Personalize email content
final subject = 'Welcome to Our Platform, ${user.firstName}!';
final body = '''
Hi ${user.firstName},
Welcome to our platform! We're excited to have you here.
Your account details:
- Email: ${user.email}
- Registration Date: ${user.createdAt}
Best regards,
The Team
''';
// Send email
await EmailService.send(
to: user.email,
subject: subject,
body: body,
html: generateWelcomeHtml(user),
);
// Update user status
await user.update({'welcome_email_sent': true});
print('✅ Welcome email sent to ${user.email}');
} catch (e) {
print('❌ Failed to send welcome email to ${user.email}: $e');
throw e; // Let queue handle retry
}
}
@override
String get displayName => 'Send Welcome Email to ${user.email}';
@override
int get maxRetries => 3;
}
// Bulk email job
class SendNewsletterJob extends QueueJob {
final List<User> subscribers;
final Newsletter newsletter;
SendNewsletterJob(this.subscribers, this.newsletter);
@override
Future<void> handle() async {
var successCount = 0;
var failureCount = 0;
for (final user in subscribers) {
try {
await EmailService.sendNewsletter(user, newsletter);
successCount++;
print('✅ Newsletter sent to ${user.email}');
} catch (e) {
failureCount++;
print('❌ Failed to send newsletter to ${user.email}: $e');
}
// Small delay to avoid overwhelming email service
await Future.delayed(Duration(milliseconds: 100));
}
print('📊 Newsletter sent: $successCount success, $failureCount failed');
// Log completion
await Analytics.track('newsletter_sent', {
'newsletter_id': newsletter.id,
'total_sent': successCount,
'total_failed': failureCount,
});
}
@override
String get displayName => 'Send Newsletter: ${newsletter.title}';
@override
int get maxRetries => 2;
}
Data Processing
dart
// Data import job
class ImportUsersJob extends QueueJob {
final String csvFilePath;
ImportUsersJob(this.csvFilePath);
@override
Future<void> handle() async {
final file = File(csvFilePath);
final lines = await file.readAsLines();
var imported = 0;
var skipped = 0;
var errors = 0;
// Skip header
for (var i = 1; i < lines.length; i++) {
final line = lines[i];
try {
final userData = parseCsvLine(line);
// Validate data
if (!isValidUserData(userData)) {
skipped++;
continue;
}
// Create user
await User.create(userData);
imported++;
// Progress logging
if (imported % 100 == 0) {
print('📊 Imported $imported users...');
}
} catch (e) {
errors++;
print('❌ Error importing line ${i + 1}: $e');
}
}
print('✅ Import completed: $imported imported, $skipped skipped, $errors errors');
// Cleanup
await file.delete();
// Send completion notification
await NotificationService.send('Data import completed', {
'file': csvFilePath,
'imported': imported,
'skipped': skipped,
'errors': errors,
});
}
@override
String get displayName => 'Import Users from CSV';
@override
int get maxRetries => 1;
}
// Data cleanup job
class CleanupOldDataJob extends QueueJob {
final Duration olderThan;
CleanupOldDataJob(this.olderThan);
@override
Future<void> handle() async {
final cutoffDate = DateTime.now().subtract(olderThan);
// Clean up old logs
final deletedLogs = await Database.table('logs')
.where('created_at', '<', cutoffDate)
.delete();
// Clean up old sessions
final deletedSessions = await Database.table('sessions')
.where('last_activity', '<', cutoffDate)
.delete();
// Clean up old notifications
final deletedNotifications = await Database.table('notifications')
.where('created_at', '<', cutoffDate)
.where('read', true)
.delete();
print('🧹 Cleanup completed:');
print(' Logs deleted: $deletedLogs');
print(' Sessions deleted: $deletedSessions');
print(' Notifications deleted: $deletedNotifications');
// Log cleanup operation
await AuditLogger.log('data_cleanup', {
'older_than_days': olderThan.inDays,
'logs_deleted': deletedLogs,
'sessions_deleted': deletedSessions,
'notifications_deleted': deletedNotifications,
});
}
@override
String get displayName => 'Cleanup Data Older Than ${olderThan.inDays} Days';
@override
int get maxRetries => 2;
}
Scheduled Tasks
dart
// Daily report job
class GenerateDailyReportJob extends QueueJob {
@override
Future<void> handle() async {
final yesterday = DateTime.now().subtract(Duration(days: 1));
final reportData = await generateReport(yesterday);
// Save report to database
final report = await Report.create({
'date': yesterday,
'data': reportData,
'generated_at': DateTime.now(),
});
// Send report via email
await EmailService.sendReport(report);
// Archive old reports (older than 30 days)
await Report.where('date', '<', DateTime.now().subtract(Duration(days: 30)))
.update({'archived': true});
print('✅ Daily report generated for ${yesterday.toDateString()}');
}
@override
String get displayName => 'Generate Daily Report';
@override
int get maxRetries => 3;
}
// Schedule daily reports
class ReportScheduler {
final QueueManager _queueManager;
ReportScheduler(this._queueManager);
void scheduleDailyReports() {
// Schedule first report for tomorrow at 9 AM
final tomorrow9AM = DateTime.now()
.add(Duration(days: 1))
.copyWith(hour: 9, minute: 0, second: 0);
final initialDelay = tomorrow9AM.difference(DateTime.now());
_queueManager.dispatch(
GenerateDailyReportJob(),
delay: initialDelay,
);
// Schedule recurring reports (this would be handled by a cron-like system)
// For now, each report schedules the next one
}
}
// Maintenance job
class DatabaseMaintenanceJob extends QueueJob {
@override
Future<void> handle() async {
print('🔧 Starting database maintenance...');
// Optimize tables
await Database.optimizeTables();
// Update statistics
await Database.analyzeTables();
// Clean up orphaned records
await Database.cleanupOrphanedRecords();
// Rebuild indexes
await Database.rebuildIndexes();
print('✅ Database maintenance completed');
// Log maintenance completion
await SystemLogger.info('Database maintenance completed', {
'timestamp': DateTime.now().toIso8601String(),
'duration': 'completed',
});
}
@override
String get displayName => 'Database Maintenance';
@override
int get maxRetries => 2;
}
// Schedule maintenance weekly
final maintenanceDelay = Duration(days: 7);
await queueManager.dispatch(
DatabaseMaintenanceJob(),
delay: maintenanceDelay,
);
Custom Drivers
dart
// Custom database queue driver
class DatabaseQueueDriver implements QueueDriver {
final DatabaseConnection _db;
DatabaseQueueDriver(this._db);
@override
Future<void> push(QueueJob job, {Duration? delay}) async {
final scheduledAt = DateTime.now().add(delay ?? Duration.zero);
await _db.table('jobs').insert({
'id': DateTime.now().millisecondsSinceEpoch.toString(),
'type': job.runtimeType.toString(),
'payload': jsonEncode(job.toJson()),
'scheduled_at': scheduledAt.toIso8601String(),
'created_at': DateTime.now().toIso8601String(),
'attempts': 0,
'max_retries': job.maxRetries,
'status': 'pending',
});
print('🗄️ Job queued to database: ${job.displayName}');
}
@override
Future<void> process() async {
// Find next pending job
final jobRecord = await _db.table('jobs')
.where('status', 'pending')
.where('scheduled_at', '<=', DateTime.now().toIso8601String())
.orderBy('created_at')
.first();
if (jobRecord == null) return;
// Mark as processing
await _db.table('jobs')
.where('id', jobRecord['id'])
.update({'status': 'processing'});
try {
// Create job instance and execute
final job = _createJobFromRecord(jobRecord);
await job.handle();
// Mark as completed
await _db.table('jobs')
.where('id', jobRecord['id'])
.update({'status': 'completed'});
print('🗄️ Job completed: ${job.displayName}');
} catch (e) {
// Handle failure and retry logic
final attempts = jobRecord['attempts'] + 1;
final maxRetries = jobRecord['max_retries'];
if (attempts >= maxRetries) {
await _db.table('jobs')
.where('id', jobRecord['id'])
.update({
'status': 'failed',
'error': e.toString(),
});
print('🗄️ Job failed permanently: ${jobRecord['type']}');
} else {
// Schedule retry
final retryDelay = Duration(seconds: 30 * attempts);
final nextAttempt = DateTime.now().add(retryDelay);
await _db.table('jobs')
.where('id', jobRecord['id'])
.update({
'status': 'pending',
'attempts': attempts,
'scheduled_at': nextAttempt.toIso8601String(),
});
print('🗄️ Job retry scheduled: ${jobRecord['type']}');
}
}
}
QueueJob _createJobFromRecord(Map<String, dynamic> record) {
// This would need a job registry/factory
// Simplified version for demonstration
final payload = jsonDecode(record['payload']);
return GenericQueueJob.fromPayload(record['type'], payload);
}
}
// Register custom driver
QueueFactory.registerDriver('database', DatabaseQueueDriver(database));
// Use custom driver
final config = {
'queue': {
'driver': 'database',
},
};
final queueManager = QueueManager(config);
Driver Interface
- •
push(job, delay)
- Add job to queue - •
process()
- Process next available job - • Implement proper error handling
- • Support delayed job execution
Best Practices
✅ Do's
- • Use appropriate drivers for your environment (memory for dev, file/redis for prod)
- • Set reasonable retry limits to prevent infinite retry loops
- • Monitor queue metrics and set up alerts for failures
- • Use delayed jobs for scheduled tasks and background processing
- • Handle errors gracefully in job implementations
- • Use batch processing for bulk operations
- • Configure worker timeouts to prevent hanging jobs
- • Log important job events for debugging
- • Test job implementations thoroughly
- • Use descriptive job names and proper error messages
❌ Don'ts
- • Don't perform long-running operations without proper timeouts
- • Don't rely on memory driver for production (jobs lost on restart)
- • Don't ignore job failures - always handle errors appropriately
- • Don't create jobs with side effects that can't be retried safely
- • Don't use synchronous driver for background processing
- • Don't forget to start queue workers in production
- • Don't use very short delays that could overwhelm the system
- • Don't log sensitive information in job error messages
- • Don't create jobs that depend on external state that may change
- • Don't forget to monitor queue health and performance
Performance Considerations
Optimization Tips
- • Memory driver is fastest but loses jobs on restart
- • File driver provides persistence with reasonable performance
- • Use appropriate worker delays to balance throughput and resource usage
- • Configure proper timeouts to prevent hanging jobs
- • Monitor memory usage with large job payloads
- • Consider job size limits for very large payloads
Resource Management
- • Set reasonable worker limits to prevent resource exhaustion
- • Use background workers for long-running applications
- • Configure proper error handling to prevent worker crashes
- • Monitor CPU and memory usage of queue workers
- • Use connection pooling for database/file operations
Testing Queue Jobs
dart
// Unit tests for queue jobs
void main() {
group('QueueJob Tests', () {
late QueueManager queueManager;
setUp(() async {
// Use memory driver for testing
final config = {'queue': {'driver': 'memory'}};
queueManager = QueueManager(config);
await queueManager.init();
});
test('Simple job executes successfully', () async {
final job = TestJob();
await queueManager.dispatch(job);
// Process the job
await queueManager.process();
expect(job.executed, isTrue);
expect(job.executionCount, equals(1));
});
test('Job with retry logic', () async {
final job = FailingJob(maxFailures: 2);
await queueManager.dispatch(job);
// Process multiple times to test retries
await queueManager.process(); // Fail 1
await queueManager.process(); // Fail 2
await queueManager.process(); // Success
expect(job.executionCount, equals(3));
expect(job.succeeded, isTrue);
});
test('Delayed job execution', () async {
final job = TestJob();
final delay = Duration(seconds: 1);
await queueManager.dispatch(job, delay: delay);
// Job should not execute immediately
await queueManager.process();
expect(job.executed, isFalse);
// Wait for delay and process again
await Future.delayed(delay + Duration(milliseconds: 100));
await queueManager.process();
expect(job.executed, isTrue);
});
test('Batch job processing', () async {
final jobs = [
TestJob(),
TestJob(),
TestJob(),
];
await queueManager.dispatchBatch(jobs);
// Process all jobs
for (var i = 0; i < jobs.length; i++) {
await queueManager.process();
}
for (final job in jobs) {
expect(job.executed, isTrue);
}
});
test('Worker processes jobs correctly', () async {
final jobs = List.generate(5, (_) => TestJob());
for (final job in jobs) {
await queueManager.dispatch(job);
}
// Start worker with short delay
await queueManager.startWorker(
maxJobs: 5,
delay: Duration(milliseconds: 10),
);
// Wait for all jobs to complete
await Future.delayed(Duration(milliseconds: 100));
for (final job in jobs) {
expect(job.executed, isTrue);
}
});
test('Error handling in jobs', () async {
final job = AlwaysFailingJob();
await queueManager.dispatch(job);
// Process and expect failure
await queueManager.process();
expect(job.executionCount, equals(1));
expect(job.failed, isTrue);
});
});
}
// Test job implementations
class TestJob extends QueueJob {
bool executed = false;
int executionCount = 0;
@override
Future<void> handle() async {
executed = true;
executionCount++;
await Future.delayed(Duration(milliseconds: 10)); // Simulate work
}
@override
String get displayName => 'Test Job';
@override
int get maxRetries => 3;
}
class FailingJob extends QueueJob {
final int maxFailures;
int executionCount = 0;
bool succeeded = false;
FailingJob({this.maxFailures = 2});
@override
Future<void> handle() async {
executionCount++;
if (executionCount <= maxFailures) {
throw Exception('Simulated failure #$executionCount');
}
succeeded = true;
}
@override
String get displayName => 'Failing Job';
@override
int get maxRetries => 5;
}
class AlwaysFailingJob extends QueueJob {
int executionCount = 0;
bool failed = false;
@override
Future<void> handle() async {
executionCount++;
failed = true;
throw Exception('This job always fails');
}
@override
String get displayName => 'Always Failing Job';
@override
int get maxRetries => 0;
}
Testing Strategies
- • Test job execution in isolation
- • Mock external dependencies
- • Test error handling and retry logic
- • Test delayed job scheduling
- • Test batch job processing
- • Verify job serialization/deserialization
Troubleshooting
Common Issues
- Jobs not processing: Check if workers are running and driver is configured
- Jobs lost on restart: Use persistent driver (file/redis) instead of memory
- High memory usage: Monitor job payload sizes and worker limits
- Jobs timing out: Increase timeout or optimize job execution
- Worker crashes: Add proper error handling and logging
- Delayed jobs not executing: Check system clock and delay calculations
Debug Commands
dart
// Debug queue status
void debugQueueStatus(QueueManager queueManager) {
final metrics = queueManager.getMetrics();
print('🔍 Queue Debug Information:');
print('Driver: ${queueManager.defaultDriverName}');
print('Total Queued: ${metrics['total_queued']}');
print('Currently Processing: ${metrics['currently_processing']}');
print('Completed: ${metrics['total_completed']}');
print('Failed: ${metrics['total_failed']}');
print('Success Rate: ${(metrics['success_rate'] * 100).toStringAsFixed(1)}%');
if (metrics['queued_by_type'] != null) {
print('Jobs by Type:');
(metrics['queued_by_type'] as Map<String, dynamic>).forEach((type, count) {
print(' $type: $count');
});
}
}
// Inspect file queue contents
void inspectFileQueue(String queuePath) {
final file = File(queuePath);
if (!file.existsSync()) {
print('❌ Queue file does not exist: $queuePath');
return;
}
try {
final content = file.readAsStringSync();
final data = jsonDecode(content) as List<dynamic>;
print('📁 File Queue Contents (${data.length} jobs):');
for (var i = 0; i < data.length && i < 10; i++) {
final job = data[i] as Map<String, dynamic>;
final scheduledAt = DateTime.parse(job['scheduledAt']);
final isDue = scheduledAt.isBefore(DateTime.now());
print('${i + 1}. ${job['type']}');
print(' ID: ${job['id']}');
print(' Scheduled: ${scheduledAt}');
print(' Due: ${isDue ? '✅' : '⏰'}');
print(' Attempts: ${job['attempts']}/${job['maxRetries']}');
print('');
}
if (data.length > 10) {
print('... and ${data.length - 10} more jobs');
}
} catch (e) {
print('❌ Failed to read queue file: $e');
}
}
// Clear stuck jobs
Future<void> clearStuckJobs(QueueManager queueManager) async {
// This would depend on the driver implementation
// For file driver:
final file = File('storage/queue/jobs.json');
if (file.existsSync()) {
final content = file.readAsStringSync();
final jobs = jsonDecode(content) as List<dynamic>;
// Remove jobs that have been processing for too long
final now = DateTime.now();
final maxProcessingTime = Duration(minutes: 30);
jobs.removeWhere((job) {
if (job['status'] == 'processing') {
final scheduledAt = DateTime.parse(job['scheduledAt']);
return now.difference(scheduledAt) > maxProcessingTime;
}
return false;
});
await file.writeAsStringSync(jsonEncode(jobs));
print('🧹 Cleared stuck jobs from file queue');
}
}
// Monitor queue health
class QueueHealthMonitor {
final QueueManager _queueManager;
QueueHealthMonitor(this._queueManager);
void startMonitoring() {
Timer.periodic(Duration(minutes: 1), (timer) {
final metrics = _queueManager.getMetrics();
// Check for warning conditions
if (metrics['failure_rate'] > 0.05) { // > 5% failure rate
print('⚠️ WARNING: High failure rate ${(metrics['failure_rate'] * 100).toStringAsFixed(1)}%');
}
if (metrics['currently_processing'] > 20) {
print('⚠️ WARNING: High concurrent processing: ${metrics['currently_processing']}');
}
if (metrics['total_queued'] > 1000) {
print('⚠️ WARNING: Large queue backlog: ${metrics['total_queued']} jobs');
}
// Log periodic health check
print('💚 Queue Health: ${metrics['total_completed']} completed, ${metrics['total_failed']} failed');
});
}
}