Queue System

Simple and efficient background job processing with multiple drivers and monitoring capabilities.

Background JobsMultiple DriversMonitoringDelayed JobsRetry LogicFile Persistence

Quick Start

Basic Job Processing

dart
// Initialize queue manager
final config = Config(); // Your config implementation
final queueManager = QueueManager(config);
await queueManager.init();

// Create and dispatch a simple job
class WelcomeEmailJob extends QueueJob {
  final String userEmail;
  final String userName;

  WelcomeEmailJob(this.userEmail, this.userName);

  @override
  Future<void> handle() async {
    print('Sending welcome email to $userName at $userEmail');
    // Send email logic here
    await Future.delayed(Duration(seconds: 1)); // Simulate work
    print('✅ Welcome email sent!');
  }

  @override
  String get displayName => 'Send Welcome Email';

  @override
  int get maxRetries => 3;
}

// Dispatch the job
final job = WelcomeEmailJob('user@example.com', 'John Doe');
await queueManager.dispatch(job);

// Start a worker to process jobs
await queueManager.startWorker(
  delay: Duration(seconds: 2),
  onJobComplete: (job, result) => print('Job completed: ${job.displayName}'),
  onJobError: (job, error, stack) => print('Job failed: ${job.displayName} - $error'),
);

💡 Note: Jobs are processed asynchronously without complex serialization

⚡ Tip: Use delayed jobs for scheduled tasks and background processing

Creating Jobs

dart
// Simple job class
class ProcessPaymentJob extends QueueJob {
  final String orderId;
  final double amount;

  ProcessPaymentJob(this.orderId, this.amount);

  @override
  Future<void> handle() async {
    print('Processing payment for order $orderId: $$${amount}');

    // Simulate payment processing
    await Future.delayed(Duration(seconds: 2));

    if (Random().nextBool()) {
      throw Exception('Payment gateway timeout');
    }

    print('✅ Payment processed successfully');
  }

  @override
  String get displayName => 'Process Payment';

  @override
  int get maxRetries => 5;
}

// Job with custom serialization
class SendNotificationJob extends QueueJob {
  final int userId;
  final String message;
  final NotificationType type;

  SendNotificationJob(this.userId, this.message, this.type);

  @override
  Future<void> handle() async {
    print('Sending $type notification to user $userId: $message');

    // Send notification logic
    await Future.delayed(Duration(milliseconds: 500));

    print('✅ Notification sent');
  }

  @override
  String get displayName => 'Send ${type.name} Notification';

  @override
  int get maxRetries => 3;

  // Custom serialization for complex objects
  @override
  Map<String, dynamic> toJson() => {
    'userId': userId,
    'message': message,
    'type': type.name,
  };

  static SendNotificationJob fromJson(Map<String, dynamic> json) {
    return SendNotificationJob(
      json['userId'],
      json['message'],
      NotificationType.values.firstWhere((e) => e.name == json['type']),
    );
  }
}

enum NotificationType { email, sms, push }

Job Interface

  • handle() - Main job execution method
  • displayName - Human-readable job name
  • maxRetries - Maximum retry attempts
  • toJson() - Optional serialization for complex jobs

Queue Drivers

Memory Driver

dart
// Memory driver - fast but loses jobs on restart
final config = {
  'queue': {
    'driver': 'memory',
  },
};

// Usage
final queueManager = QueueManager(config);
await queueManager.init();

// Jobs are stored in memory only
await queueManager.dispatch(WelcomeEmailJob('user@example.com', 'John'));

// ⚠️  Jobs will be lost if application restarts
// ✅ Perfect for development and testing
// ✅ Fastest performance

File Driver

dart
// File driver - persists jobs to disk
final config = {
  'queue': {
    'driver': 'file',
    'file': {
      'path': 'storage/queue/jobs.json',
    },
  },
};

// Usage
final queueManager = QueueManager(config);
await queueManager.init();

// Jobs are saved to file and survive restarts
await queueManager.dispatch(ImportantJob());

// File structure:
// storage/queue/jobs.json
// {
//   "jobs": [
//     {
//       "id": "123456789",
//       "type": "ImportantJob",
//       "payload": {...},
//       "scheduledAt": "2024-01-15T10:30:00.000Z",
//       "attempts": 0,
//       "maxRetries": 3
//     }
//   ]
// }

Sync Driver

dart
// Sync driver - processes jobs immediately
final config = {
  'queue': {
    'driver': 'sync',
  },
};

// Usage
final queueManager = QueueManager(config);
await queueManager.init();

// Jobs execute immediately (no queuing)
await queueManager.dispatch(QuickJob());
// Output: Job executed immediately!

// ✅ No background processing needed
// ✅ Perfect for simple synchronous operations
// ❌ No actual queuing - jobs run in main thread

Driver Comparison

DriverPersistencePerformanceUse Case
Memory❌ None⚡ FastestDevelopment/Testing
File✅ Disk🟡 MediumSimple persistence
Sync❌ None⚡ ImmediateSynchronous processing
Redis✅ Redis🟡 FastProduction clusters

Configuration

dart
// Basic configuration
final config = {
  'queue': {
    'driver': 'memory', // memory, file, sync, redis
  },
};

// File driver configuration
final fileConfig = {
  'queue': {
    'driver': 'file',
    'file': {
      'path': 'storage/queue/jobs.json',
      'max_file_size': 5242880, // 5MB
      'max_backups': 5,
    },
  },
};

// Redis driver configuration
final redisConfig = {
  'queue': {
    'driver': 'redis',
    'redis': {
      'connection': 'default',
      'queue': 'default',
      'retry_after': 90,
    },
  },
};

// Worker configuration
final workerConfig = {
  'queue': {
    'worker': {
      'delay': 1, // seconds between jobs
      'max_jobs': 1000,
      'timeout': 3600, // 1 hour
    },
  },
};

// Initialize with configuration
final queueManager = QueueManager(config);
await queueManager.init();

// Custom driver registration
QueueFactory.registerDriver('custom', CustomQueueDriver());

Configuration Options

  • driver - Queue driver (memory, file, sync, redis)
  • file.path - File path for file driver
  • redis.connection - Redis connection for redis driver
  • worker.delay - Delay between job processing
  • worker.max_jobs - Maximum jobs per worker

Delayed Jobs

dart
// Immediate job execution
await queueManager.dispatch(UrgentJob());

// Delayed job execution
await queueManager.dispatch(
  ScheduledJob(),
  delay: Duration(minutes: 5),
);

// Various delay examples
final delays = [
  Duration(seconds: 30),     // 30 seconds
  Duration(minutes: 5),      // 5 minutes
  Duration(hours: 1),        // 1 hour
  Duration(days: 1),         // 1 day
  Duration(hours: 24),       // 24 hours
];

// Schedule newsletter
await queueManager.dispatch(
  SendNewsletterJob(),
  delay: Duration(hours: 9), // Send at 9 AM tomorrow
);

// Retry with backoff
class RetryWithBackoffJob extends QueueJob {
  final int attempt;

  RetryWithBackoffJob([this.attempt = 0]);

  @override
  Future<void> handle() async {
    try {
      await riskyOperation();
      print('✅ Operation succeeded on attempt ${attempt + 1}');
    } catch (e) {
      if (attempt < maxRetries) {
        // Exponential backoff: 30s, 1m, 2m, 4m, 8m...
        final delay = Duration(seconds: 30 * (1 << attempt));
        await queueManager.dispatch(
          RetryWithBackoffJob(attempt + 1),
          delay: delay,
        );
        print('⏰ Retrying in ${delay.inSeconds} seconds...');
      } else {
        print('❌ Operation failed permanently after ${attempt + 1} attempts');
      }
    }
  }

  @override
  String get displayName => 'Retry Operation (Attempt ${attempt + 1})';

  @override
  int get maxRetries => 5;
}

Delay Examples

  • Duration(seconds: 30) - 30 seconds delay
  • Duration(minutes: 5) - 5 minutes delay
  • Duration(hours: 1) - 1 hour delay
  • Duration(days: 1) - 1 day delay
  • null - Immediate processing

Queue Workers

dart
// Basic worker
await queueManager.startWorker();

// Worker with custom configuration
await queueManager.startWorker(
  maxJobs: 100,                    // Process max 100 jobs
  delay: Duration(seconds: 5),     // 5 second delay between jobs
  timeout: Duration(hours: 1),     // Stop after 1 hour
  runInBackground: true,           // Run in background
);

// Worker with event callbacks
await queueManager.startWorker(
  onJobStart: (job) {
    print('🚀 Starting job: ${job.displayName}');
  },
  onJobComplete: (job, result) {
    print('✅ Completed job: ${job.displayName}');
  },
  onJobError: (job, error, stack) {
    print('❌ Failed job: ${job.displayName} - $error');
    // Send alert, log to external service, etc.
  },
  onError: (error, stack) {
    print('💥 Worker error: $error');
    // Handle worker-level errors
  },
);

// Long-running worker for production
await queueManager.startWorker(
  delay: Duration(milliseconds: 500),  // Fast processing
  maxJobs: null,                       // Run indefinitely
  timeout: null,                       // No timeout
  runInBackground: true,
);

// Development worker with verbose logging
await queueManager.startWorker(
  delay: Duration(seconds: 2),
  onJobStart: (job) => print('[DEV] Processing: ${job.displayName}'),
  onJobComplete: (job, result) => print('[DEV] Completed: ${job.displayName}'),
  onJobError: (job, error, stack) => print('[DEV] Error in ${job.displayName}: $error'),
);

Worker Configuration

  • maxJobs - Maximum jobs to process before stopping
  • delay - Delay between processing jobs
  • timeout - Maximum time to run
  • runInBackground - Run worker in background
  • onError - Error callback function
  • onJobStart/Complete/Error - Job lifecycle callbacks

Batch Processing

dart
// Process multiple jobs at once
final jobs = [
  SendEmailJob('user1@example.com', 'Welcome User 1'),
  SendEmailJob('user2@example.com', 'Welcome User 2'),
  SendEmailJob('user3@example.com', 'Welcome User 3'),
  ProcessPaymentJob('order_123', 99.99),
  UpdateInventoryJob('product_456', -5),
];

// Dispatch all at once
await queueManager.dispatchBatch(jobs);

// Dispatch with individual delays
await queueManager.dispatchBatch(jobs, delay: Duration(seconds: 10));

// Batch processing job
class BulkEmailJob extends QueueJob {
  final List<String> emails;
  final String subject;
  final String template;

  BulkEmailJob(this.emails, this.subject, this.template);

  @override
  Future<void> handle() async {
    print('Sending ${emails.length} emails...');

    for (final email in emails) {
      try {
        final personalizedContent = template.replaceAll('{{email}}', email);
        await sendEmail(email, subject, personalizedContent);
        print('✅ Sent to $email');
      } catch (e) {
        print('❌ Failed to send to $email: $e');
        // Continue with other emails
      }
    }

    print('✅ Bulk email job completed');
  }

  @override
  String get displayName => 'Send Bulk Email (${emails.length} recipients)';

  @override
  int get maxRetries => 2;
}

// Usage
await queueManager.dispatch(BulkEmailJob(
  ['user1@example.com', 'user2@example.com', 'user3@example.com'],
  'Monthly Newsletter',
  'Hello {{email}}, here is your newsletter...',
));

Batch Benefits

  • • Efficient bulk operations
  • • Reduced overhead for multiple jobs
  • • Atomic batch processing
  • • Better resource utilization

Monitoring and Metrics

dart
// Get current metrics
final metrics = queueManager.getMetrics();
print('Queue Metrics:');
print('  Total Queued: ${metrics['total_queued']}');
print('  Total Completed: ${metrics['total_completed']}');
print('  Total Failed: ${metrics['total_failed']}');
print('  Currently Processing: ${metrics['currently_processing']}');
print('  Success Rate: ${(metrics['success_rate'] * 100).toStringAsFixed(1)}%');
print('  Average Processing Time: ${metrics['average_processing_time_ms']}ms');

// Monitor specific job types
final completedByType = metrics['completed_by_type'] as Map<String, int>;
print('Jobs by Type:');
completedByType.forEach((type, count) {
  print('  $type: $count');
});

// Reset metrics (useful for testing)
queueManager.resetMetrics();

// Custom monitoring
class QueueMonitor {
  final QueueManager _queueManager;

  QueueMonitor(this._queueManager);

  void startMonitoring() {
    Timer.periodic(Duration(minutes: 5), (timer) {
      final metrics = _queueManager.getMetrics();

      // Check for issues
      if (metrics['failure_rate'] > 0.1) { // > 10% failure rate
        print('⚠️  High failure rate detected: ${(metrics['failure_rate'] * 100).toStringAsFixed(1)}%');
        // Send alert
      }

      if (metrics['currently_processing'] > 10) { // Too many jobs processing
        print('⚠️  High concurrent jobs: ${metrics['currently_processing']}');
        // Scale up workers
      }

      // Log metrics
      print('📊 Queue Health Check:');
      print('   Success Rate: ${(metrics['success_rate'] * 100).toStringAsFixed(1)}%');
      print('   Processing: ${metrics['currently_processing']} jobs');
      print('   Queue Size: ${metrics['total_queued']} jobs');
    });
  }
}

Available Metrics

  • total_queued - Total jobs queued
  • total_completed - Jobs successfully completed
  • total_failed - Jobs that failed
  • currently_processing - Jobs currently being processed
  • success_rate - Percentage of successful jobs
  • average_processing_time - Average job processing time

Error Handling and Retries

dart
// Job with error handling
class ProcessOrderJob extends QueueJob {
  final String orderId;

  ProcessOrderJob(this.orderId);

  @override
  Future<void> handle() async {
    try {
      // Step 1: Validate order
      final order = await Order.find(orderId);
      if (order == null) {
        throw Exception('Order not found: $orderId');
      }

      // Step 2: Process payment
      await PaymentService.charge(order);

      // Step 3: Update inventory
      for (final item in order.items) {
        await Inventory.decrement(item.productId, item.quantity);
      }

      // Step 4: Send confirmation
      await EmailService.sendOrderConfirmation(order);

      print('✅ Order $orderId processed successfully');

    } catch (e) {
      print('❌ Failed to process order $orderId: $e');

      // Log detailed error information
      await ErrorLogger.log('order_processing_failed', {
        'orderId': orderId,
        'error': e.toString(),
        'timestamp': DateTime.now().toIso8601String(),
        'attempt': attempt,
      });

      // Don't rethrow - let queue handle retry logic
      // The job will be retried based on maxRetries
    }
  }

  @override
  String get displayName => 'Process Order $orderId';

  @override
  int get maxRetries => 3;
}

// Worker with error handling
await queueManager.startWorker(
  onJobError: (job, error, stack) async {
    // Log to external service
    await ExternalLogger.error('Queue job failed', {
      'job': job.displayName,
      'error': error.toString(),
      'stack': stack.toString(),
      'timestamp': DateTime.now().toIso8601String(),
    });

    // Send alert for critical jobs
    if (job.displayName.contains('Payment') || job.displayName.contains('Order')) {
      await AlertService.send('Critical job failed: ${job.displayName}');
    }
  },
  onError: (error, stack) async {
    // Worker-level error
    await ExternalLogger.critical('Queue worker error', {
      'error': error.toString(),
      'stack': stack.toString(),
    });

    // Restart worker or send alert
    await AlertService.send('Queue worker crashed - manual intervention required');
  },
);

// Graceful error recovery
class ResilientJob extends QueueJob {
  @override
  Future<void> handle() async {
    const maxRetries = 3;
    var attempt = 0;

    while (attempt < maxRetries) {
      try {
        await performRiskyOperation();
        print('✅ Operation succeeded');
        return;
      } catch (e) {
        attempt++;
        print('⚠️  Attempt $attempt failed: $e');

        if (attempt < maxRetries) {
          // Wait before retry
          await Future.delayed(Duration(seconds: attempt * 5));
        }
      }
    }

    throw Exception('Operation failed after $maxRetries attempts');
  }

  @override
  String get displayName => 'Resilient Operation';

  @override
  int get maxRetries => 0; // Handle retries internally
}

Retry Behavior

  • • Jobs retry automatically on failure
  • • Configurable maximum retry attempts
  • • Exponential backoff for retries
  • • Failed jobs are logged but don't crash workers
  • • File driver persists retry state across restarts

Real-World Examples

Email Processing

dart
// Email processing job
class SendWelcomeEmailJob extends QueueJob {
  final User user;

  SendWelcomeEmailJob(this.user);

  @override
  Future<void> handle() async {
    try {
      // Personalize email content
      final subject = 'Welcome to Our Platform, ${user.firstName}!';
      final body = '''
        Hi ${user.firstName},

        Welcome to our platform! We're excited to have you here.

        Your account details:
        - Email: ${user.email}
        - Registration Date: ${user.createdAt}

        Best regards,
        The Team
      ''';

      // Send email
      await EmailService.send(
        to: user.email,
        subject: subject,
        body: body,
        html: generateWelcomeHtml(user),
      );

      // Update user status
      await user.update({'welcome_email_sent': true});

      print('✅ Welcome email sent to ${user.email}');

    } catch (e) {
      print('❌ Failed to send welcome email to ${user.email}: $e');
      throw e; // Let queue handle retry
    }
  }

  @override
  String get displayName => 'Send Welcome Email to ${user.email}';

  @override
  int get maxRetries => 3;
}

// Bulk email job
class SendNewsletterJob extends QueueJob {
  final List<User> subscribers;
  final Newsletter newsletter;

  SendNewsletterJob(this.subscribers, this.newsletter);

  @override
  Future<void> handle() async {
    var successCount = 0;
    var failureCount = 0;

    for (final user in subscribers) {
      try {
        await EmailService.sendNewsletter(user, newsletter);
        successCount++;
        print('✅ Newsletter sent to ${user.email}');
      } catch (e) {
        failureCount++;
        print('❌ Failed to send newsletter to ${user.email}: $e');
      }

      // Small delay to avoid overwhelming email service
      await Future.delayed(Duration(milliseconds: 100));
    }

    print('📊 Newsletter sent: $successCount success, $failureCount failed');

    // Log completion
    await Analytics.track('newsletter_sent', {
      'newsletter_id': newsletter.id,
      'total_sent': successCount,
      'total_failed': failureCount,
    });
  }

  @override
  String get displayName => 'Send Newsletter: ${newsletter.title}';

  @override
  int get maxRetries => 2;
}

Data Processing

dart
// Data import job
class ImportUsersJob extends QueueJob {
  final String csvFilePath;

  ImportUsersJob(this.csvFilePath);

  @override
  Future<void> handle() async {
    final file = File(csvFilePath);
    final lines = await file.readAsLines();

    var imported = 0;
    var skipped = 0;
    var errors = 0;

    // Skip header
    for (var i = 1; i < lines.length; i++) {
      final line = lines[i];
      try {
        final userData = parseCsvLine(line);

        // Validate data
        if (!isValidUserData(userData)) {
          skipped++;
          continue;
        }

        // Create user
        await User.create(userData);
        imported++;

        // Progress logging
        if (imported % 100 == 0) {
          print('📊 Imported $imported users...');
        }

      } catch (e) {
        errors++;
        print('❌ Error importing line ${i + 1}: $e');
      }
    }

    print('✅ Import completed: $imported imported, $skipped skipped, $errors errors');

    // Cleanup
    await file.delete();

    // Send completion notification
    await NotificationService.send('Data import completed', {
      'file': csvFilePath,
      'imported': imported,
      'skipped': skipped,
      'errors': errors,
    });
  }

  @override
  String get displayName => 'Import Users from CSV';

  @override
  int get maxRetries => 1;
}

// Data cleanup job
class CleanupOldDataJob extends QueueJob {
  final Duration olderThan;

  CleanupOldDataJob(this.olderThan);

  @override
  Future<void> handle() async {
    final cutoffDate = DateTime.now().subtract(olderThan);

    // Clean up old logs
    final deletedLogs = await Database.table('logs')
      .where('created_at', '<', cutoffDate)
      .delete();

    // Clean up old sessions
    final deletedSessions = await Database.table('sessions')
      .where('last_activity', '<', cutoffDate)
      .delete();

    // Clean up old notifications
    final deletedNotifications = await Database.table('notifications')
      .where('created_at', '<', cutoffDate)
      .where('read', true)
      .delete();

    print('🧹 Cleanup completed:');
    print('   Logs deleted: $deletedLogs');
    print('   Sessions deleted: $deletedSessions');
    print('   Notifications deleted: $deletedNotifications');

    // Log cleanup operation
    await AuditLogger.log('data_cleanup', {
      'older_than_days': olderThan.inDays,
      'logs_deleted': deletedLogs,
      'sessions_deleted': deletedSessions,
      'notifications_deleted': deletedNotifications,
    });
  }

  @override
  String get displayName => 'Cleanup Data Older Than ${olderThan.inDays} Days';

  @override
  int get maxRetries => 2;
}

Scheduled Tasks

dart
// Daily report job
class GenerateDailyReportJob extends QueueJob {
  @override
  Future<void> handle() async {
    final yesterday = DateTime.now().subtract(Duration(days: 1));
    final reportData = await generateReport(yesterday);

    // Save report to database
    final report = await Report.create({
      'date': yesterday,
      'data': reportData,
      'generated_at': DateTime.now(),
    });

    // Send report via email
    await EmailService.sendReport(report);

    // Archive old reports (older than 30 days)
    await Report.where('date', '<', DateTime.now().subtract(Duration(days: 30)))
      .update({'archived': true});

    print('✅ Daily report generated for ${yesterday.toDateString()}');
  }

  @override
  String get displayName => 'Generate Daily Report';

  @override
  int get maxRetries => 3;
}

// Schedule daily reports
class ReportScheduler {
  final QueueManager _queueManager;

  ReportScheduler(this._queueManager);

  void scheduleDailyReports() {
    // Schedule first report for tomorrow at 9 AM
    final tomorrow9AM = DateTime.now()
      .add(Duration(days: 1))
      .copyWith(hour: 9, minute: 0, second: 0);

    final initialDelay = tomorrow9AM.difference(DateTime.now());

    _queueManager.dispatch(
      GenerateDailyReportJob(),
      delay: initialDelay,
    );

    // Schedule recurring reports (this would be handled by a cron-like system)
    // For now, each report schedules the next one
  }
}

// Maintenance job
class DatabaseMaintenanceJob extends QueueJob {
  @override
  Future<void> handle() async {
    print('🔧 Starting database maintenance...');

    // Optimize tables
    await Database.optimizeTables();

    // Update statistics
    await Database.analyzeTables();

    // Clean up orphaned records
    await Database.cleanupOrphanedRecords();

    // Rebuild indexes
    await Database.rebuildIndexes();

    print('✅ Database maintenance completed');

    // Log maintenance completion
    await SystemLogger.info('Database maintenance completed', {
      'timestamp': DateTime.now().toIso8601String(),
      'duration': 'completed',
    });
  }

  @override
  String get displayName => 'Database Maintenance';

  @override
  int get maxRetries => 2;
}

// Schedule maintenance weekly
final maintenanceDelay = Duration(days: 7);
await queueManager.dispatch(
  DatabaseMaintenanceJob(),
  delay: maintenanceDelay,
);

Custom Drivers

dart
// Custom database queue driver
class DatabaseQueueDriver implements QueueDriver {
  final DatabaseConnection _db;

  DatabaseQueueDriver(this._db);

  @override
  Future<void> push(QueueJob job, {Duration? delay}) async {
    final scheduledAt = DateTime.now().add(delay ?? Duration.zero);

    await _db.table('jobs').insert({
      'id': DateTime.now().millisecondsSinceEpoch.toString(),
      'type': job.runtimeType.toString(),
      'payload': jsonEncode(job.toJson()),
      'scheduled_at': scheduledAt.toIso8601String(),
      'created_at': DateTime.now().toIso8601String(),
      'attempts': 0,
      'max_retries': job.maxRetries,
      'status': 'pending',
    });

    print('🗄️ Job queued to database: ${job.displayName}');
  }

  @override
  Future<void> process() async {
    // Find next pending job
    final jobRecord = await _db.table('jobs')
      .where('status', 'pending')
      .where('scheduled_at', '<=', DateTime.now().toIso8601String())
      .orderBy('created_at')
      .first();

    if (jobRecord == null) return;

    // Mark as processing
    await _db.table('jobs')
      .where('id', jobRecord['id'])
      .update({'status': 'processing'});

    try {
      // Create job instance and execute
      final job = _createJobFromRecord(jobRecord);
      await job.handle();

      // Mark as completed
      await _db.table('jobs')
        .where('id', jobRecord['id'])
        .update({'status': 'completed'});

      print('🗄️ Job completed: ${job.displayName}');

    } catch (e) {
      // Handle failure and retry logic
      final attempts = jobRecord['attempts'] + 1;
      final maxRetries = jobRecord['max_retries'];

      if (attempts >= maxRetries) {
        await _db.table('jobs')
          .where('id', jobRecord['id'])
          .update({
            'status': 'failed',
            'error': e.toString(),
          });
        print('🗄️ Job failed permanently: ${jobRecord['type']}');
      } else {
        // Schedule retry
        final retryDelay = Duration(seconds: 30 * attempts);
        final nextAttempt = DateTime.now().add(retryDelay);

        await _db.table('jobs')
          .where('id', jobRecord['id'])
          .update({
            'status': 'pending',
            'attempts': attempts,
            'scheduled_at': nextAttempt.toIso8601String(),
          });
        print('🗄️ Job retry scheduled: ${jobRecord['type']}');
      }
    }
  }

  QueueJob _createJobFromRecord(Map<String, dynamic> record) {
    // This would need a job registry/factory
    // Simplified version for demonstration
    final payload = jsonDecode(record['payload']);
    return GenericQueueJob.fromPayload(record['type'], payload);
  }
}

// Register custom driver
QueueFactory.registerDriver('database', DatabaseQueueDriver(database));

// Use custom driver
final config = {
  'queue': {
    'driver': 'database',
  },
};
final queueManager = QueueManager(config);

Driver Interface

  • push(job, delay) - Add job to queue
  • process() - Process next available job
  • • Implement proper error handling
  • • Support delayed job execution

Best Practices

✅ Do's

  • • Use appropriate drivers for your environment (memory for dev, file/redis for prod)
  • • Set reasonable retry limits to prevent infinite retry loops
  • • Monitor queue metrics and set up alerts for failures
  • • Use delayed jobs for scheduled tasks and background processing
  • • Handle errors gracefully in job implementations
  • • Use batch processing for bulk operations
  • • Configure worker timeouts to prevent hanging jobs
  • • Log important job events for debugging
  • • Test job implementations thoroughly
  • • Use descriptive job names and proper error messages

❌ Don'ts

  • • Don't perform long-running operations without proper timeouts
  • • Don't rely on memory driver for production (jobs lost on restart)
  • • Don't ignore job failures - always handle errors appropriately
  • • Don't create jobs with side effects that can't be retried safely
  • • Don't use synchronous driver for background processing
  • • Don't forget to start queue workers in production
  • • Don't use very short delays that could overwhelm the system
  • • Don't log sensitive information in job error messages
  • • Don't create jobs that depend on external state that may change
  • • Don't forget to monitor queue health and performance

Performance Considerations

Optimization Tips

  • • Memory driver is fastest but loses jobs on restart
  • • File driver provides persistence with reasonable performance
  • • Use appropriate worker delays to balance throughput and resource usage
  • • Configure proper timeouts to prevent hanging jobs
  • • Monitor memory usage with large job payloads
  • • Consider job size limits for very large payloads

Resource Management

  • • Set reasonable worker limits to prevent resource exhaustion
  • • Use background workers for long-running applications
  • • Configure proper error handling to prevent worker crashes
  • • Monitor CPU and memory usage of queue workers
  • • Use connection pooling for database/file operations

Testing Queue Jobs

dart
// Unit tests for queue jobs
void main() {
  group('QueueJob Tests', () {
    late QueueManager queueManager;

    setUp(() async {
      // Use memory driver for testing
      final config = {'queue': {'driver': 'memory'}};
      queueManager = QueueManager(config);
      await queueManager.init();
    });

    test('Simple job executes successfully', () async {
      final job = TestJob();
      await queueManager.dispatch(job);

      // Process the job
      await queueManager.process();

      expect(job.executed, isTrue);
      expect(job.executionCount, equals(1));
    });

    test('Job with retry logic', () async {
      final job = FailingJob(maxFailures: 2);
      await queueManager.dispatch(job);

      // Process multiple times to test retries
      await queueManager.process(); // Fail 1
      await queueManager.process(); // Fail 2
      await queueManager.process(); // Success

      expect(job.executionCount, equals(3));
      expect(job.succeeded, isTrue);
    });

    test('Delayed job execution', () async {
      final job = TestJob();
      final delay = Duration(seconds: 1);

      await queueManager.dispatch(job, delay: delay);

      // Job should not execute immediately
      await queueManager.process();
      expect(job.executed, isFalse);

      // Wait for delay and process again
      await Future.delayed(delay + Duration(milliseconds: 100));
      await queueManager.process();
      expect(job.executed, isTrue);
    });

    test('Batch job processing', () async {
      final jobs = [
        TestJob(),
        TestJob(),
        TestJob(),
      ];

      await queueManager.dispatchBatch(jobs);

      // Process all jobs
      for (var i = 0; i < jobs.length; i++) {
        await queueManager.process();
      }

      for (final job in jobs) {
        expect(job.executed, isTrue);
      }
    });

    test('Worker processes jobs correctly', () async {
      final jobs = List.generate(5, (_) => TestJob());
      for (final job in jobs) {
        await queueManager.dispatch(job);
      }

      // Start worker with short delay
      await queueManager.startWorker(
        maxJobs: 5,
        delay: Duration(milliseconds: 10),
      );

      // Wait for all jobs to complete
      await Future.delayed(Duration(milliseconds: 100));

      for (final job in jobs) {
        expect(job.executed, isTrue);
      }
    });

    test('Error handling in jobs', () async {
      final job = AlwaysFailingJob();
      await queueManager.dispatch(job);

      // Process and expect failure
      await queueManager.process();

      expect(job.executionCount, equals(1));
      expect(job.failed, isTrue);
    });
  });
}

// Test job implementations
class TestJob extends QueueJob {
  bool executed = false;
  int executionCount = 0;

  @override
  Future<void> handle() async {
    executed = true;
    executionCount++;
    await Future.delayed(Duration(milliseconds: 10)); // Simulate work
  }

  @override
  String get displayName => 'Test Job';

  @override
  int get maxRetries => 3;
}

class FailingJob extends QueueJob {
  final int maxFailures;
  int executionCount = 0;
  bool succeeded = false;

  FailingJob({this.maxFailures = 2});

  @override
  Future<void> handle() async {
    executionCount++;

    if (executionCount <= maxFailures) {
      throw Exception('Simulated failure #$executionCount');
    }

    succeeded = true;
  }

  @override
  String get displayName => 'Failing Job';

  @override
  int get maxRetries => 5;
}

class AlwaysFailingJob extends QueueJob {
  int executionCount = 0;
  bool failed = false;

  @override
  Future<void> handle() async {
    executionCount++;
    failed = true;
    throw Exception('This job always fails');
  }

  @override
  String get displayName => 'Always Failing Job';

  @override
  int get maxRetries => 0;
}

Testing Strategies

  • • Test job execution in isolation
  • • Mock external dependencies
  • • Test error handling and retry logic
  • • Test delayed job scheduling
  • • Test batch job processing
  • • Verify job serialization/deserialization

Troubleshooting

Common Issues

  • Jobs not processing: Check if workers are running and driver is configured
  • Jobs lost on restart: Use persistent driver (file/redis) instead of memory
  • High memory usage: Monitor job payload sizes and worker limits
  • Jobs timing out: Increase timeout or optimize job execution
  • Worker crashes: Add proper error handling and logging
  • Delayed jobs not executing: Check system clock and delay calculations

Debug Commands

dart
// Debug queue status
void debugQueueStatus(QueueManager queueManager) {
  final metrics = queueManager.getMetrics();

  print('🔍 Queue Debug Information:');
  print('Driver: ${queueManager.defaultDriverName}');
  print('Total Queued: ${metrics['total_queued']}');
  print('Currently Processing: ${metrics['currently_processing']}');
  print('Completed: ${metrics['total_completed']}');
  print('Failed: ${metrics['total_failed']}');
  print('Success Rate: ${(metrics['success_rate'] * 100).toStringAsFixed(1)}%');

  if (metrics['queued_by_type'] != null) {
    print('Jobs by Type:');
    (metrics['queued_by_type'] as Map<String, dynamic>).forEach((type, count) {
      print('  $type: $count');
    });
  }
}

// Inspect file queue contents
void inspectFileQueue(String queuePath) {
  final file = File(queuePath);

  if (!file.existsSync()) {
    print('❌ Queue file does not exist: $queuePath');
    return;
  }

  try {
    final content = file.readAsStringSync();
    final data = jsonDecode(content) as List<dynamic>;

    print('📁 File Queue Contents (${data.length} jobs):');

    for (var i = 0; i < data.length && i < 10; i++) {
      final job = data[i] as Map<String, dynamic>;
      final scheduledAt = DateTime.parse(job['scheduledAt']);
      final isDue = scheduledAt.isBefore(DateTime.now());

      print('${i + 1}. ${job['type']}');
      print('   ID: ${job['id']}');
      print('   Scheduled: ${scheduledAt}');
      print('   Due: ${isDue ? '✅' : '⏰'}');
      print('   Attempts: ${job['attempts']}/${job['maxRetries']}');
      print('');
    }

    if (data.length > 10) {
      print('... and ${data.length - 10} more jobs');
    }

  } catch (e) {
    print('❌ Failed to read queue file: $e');
  }
}

// Clear stuck jobs
Future<void> clearStuckJobs(QueueManager queueManager) async {
  // This would depend on the driver implementation
  // For file driver:
  final file = File('storage/queue/jobs.json');

  if (file.existsSync()) {
    final content = file.readAsStringSync();
    final jobs = jsonDecode(content) as List<dynamic>;

    // Remove jobs that have been processing for too long
    final now = DateTime.now();
    final maxProcessingTime = Duration(minutes: 30);

    jobs.removeWhere((job) {
      if (job['status'] == 'processing') {
        final scheduledAt = DateTime.parse(job['scheduledAt']);
        return now.difference(scheduledAt) > maxProcessingTime;
      }
      return false;
    });

    await file.writeAsStringSync(jsonEncode(jobs));
    print('🧹 Cleared stuck jobs from file queue');
  }
}

// Monitor queue health
class QueueHealthMonitor {
  final QueueManager _queueManager;

  QueueHealthMonitor(this._queueManager);

  void startMonitoring() {
    Timer.periodic(Duration(minutes: 1), (timer) {
      final metrics = _queueManager.getMetrics();

      // Check for warning conditions
      if (metrics['failure_rate'] > 0.05) { // > 5% failure rate
        print('⚠️  WARNING: High failure rate ${(metrics['failure_rate'] * 100).toStringAsFixed(1)}%');
      }

      if (metrics['currently_processing'] > 20) {
        print('⚠️  WARNING: High concurrent processing: ${metrics['currently_processing']}');
      }

      if (metrics['total_queued'] > 1000) {
        print('⚠️  WARNING: Large queue backlog: ${metrics['total_queued']} jobs');
      }

      // Log periodic health check
      print('💚 Queue Health: ${metrics['total_completed']} completed, ${metrics['total_failed']} failed');
    });
  }
}

On this page