Queue System

Process time-consuming tasks asynchronously in the background with Khadem's flexible queue system. Support for multiple storage backends, automatic retries, delayed execution, and comprehensive monitoring keeps your application responsive while handling heavy workloads reliably.

Background ProcessingMultiple DriversAuto-RetryJob MonitoringDelayed ExecutionScalable

Introduction

The queue system allows you to defer time-consuming tasksβ€”like sending emails, processing uploads, generating reports, or calling external APIsβ€”to be executed later in the background. This keeps your application responsive and improves user experience by not making them wait for slow operations to complete.

Why Use Queues?

  • Improved Response Times: Return responses immediately while processing work in the background
  • Better Resource Management: Control concurrency and prevent overwhelming your system
  • Reliability: Automatic retry logic handles transient failures (network issues, rate limits, etc.)
  • Scheduling: Delay job execution to a specific time or after a duration
  • Isolation: Failed jobs don't affect other jobs or your main application
  • Scalability: Easily scale by adding more worker processes

How It Works

  1. Create a Job: Extend QueueJob and implement the handle() method
  2. Dispatch the Job: Call queueManager.dispatch(job) to add it to the queue
  3. Worker Processes Jobs: A worker continuously fetches and executes jobs from the queue
  4. Job Execution: The job's handle() method runs with automatic retry on failure

Quick Start

Getting started with queues is simple: create a job class, dispatch it, and start a worker. The queue manager handles all the complexity of job storage, retrieval, and execution. No job registration required!

dart
// Import queue system (all organized modules included)
import 'package:khadem/src/core/queue/index.dart';

// Or import specific modules
import 'package:khadem/src/core/queue/queue_manager.dart';
import 'package:khadem/src/core/queue/drivers/in_memory_driver.dart';
import 'package:khadem/src/contracts/queue/queue_job.dart';

// Initialize queue manager with config
final config = Config(); // Your config implementation
final queueManager = QueueManager(config);

// Load configuration and register drivers
queueManager.loadFromConfig();

// Or manually register a driver
final memoryDriver = InMemoryDriver(config: DriverConfig(name: 'memory'));
queueManager.registerDriver('memory', memoryDriver);

// Create and dispatch a simple job
class WelcomeEmailJob extends QueueJob {
  final String userEmail;
  final String userName;

  WelcomeEmailJob(this.userEmail, this.userName);

  @override
  Future<void> handle() async {
    print('Sending welcome email to $userName at $userEmail');
    // Send email logic here
    await Future.delayed(Duration(seconds: 1)); // Simulate work
    print('βœ… Welcome email sent!');
  }

  @override
  String get displayName => 'Send Welcome Email';

  @override
  int get maxRetries => 3;
}

// Dispatch the job
final job = WelcomeEmailJob('user@example.com', 'John Doe');
await queueManager.dispatch(job);

// Start a worker to process jobs
await queueManager.startWorker(
  delay: Duration(seconds: 2),
  onJobComplete: (job, result) => print('Job completed: ${job.displayName}'),
  onJobError: (job, error, stack) => print('Job failed: ${job.displayName} - $error'),
);

πŸ’‘ Understanding the Flow

  1. Initialize: Create a QueueManager and optionally call loadFromConfig() to load drivers from configuration
  2. Define Jobs: Create job classes that extend QueueJob - no registration needed!
  3. Dispatch: Call queueManager.dispatch(job) to add jobs to the queue
  4. Start Worker: Call startWorker() to begin processing jobs in the background

Key Difference from Events: Queues are persistent and guarantee execution. Jobs survive application restarts (with file, database, or Redis drivers) and are executed exactly once, even if the worker crashes mid-execution.

Creating Jobs

Jobs are simple classes that extend QueueJob and implement the handle() method. The handle() method contains your job's logicβ€”sending an email, processing a file, updating records, etc. You can override various properties to customize retry behavior, timeouts, and display names.

dart
// Import required classes
import 'package:khadem/src/contracts/queue/queue_job.dart';
import 'package:khadem/src/core/queue/serialization/index.dart';

// Simple job class
class ProcessPaymentJob extends QueueJob {
  final String orderId;
  final double amount;

  ProcessPaymentJob(this.orderId, this.amount);

  @override
  Future<void> handle() async {
    print('Processing payment for order $orderId: $$${amount}');

    // Simulate payment processing
    await Future.delayed(Duration(seconds: 2));

    if (Random().nextBool()) {
      throw Exception('Payment gateway timeout');
    }

    print('βœ… Payment processed successfully');
  }

  @override
  String get displayName => 'Process Payment';

  @override
  int get maxRetries => 5;
}

// Job with custom serialization (for persistent queues)
class SendNotificationJob extends QueueJob with SerializableJob {
  final int userId;
  final String message;
  final NotificationType type;

  SendNotificationJob(this.userId, this.message, this.type);

  @override
  Future<void> handle() async {
    print('Sending $type notification to user $userId: $message');

    // Send notification logic
    await Future.delayed(Duration(milliseconds: 500));

    print('βœ… Notification sent');
  }

  @override
  String get displayName => 'Send ${type.name} Notification';

  @override
  int get maxRetries => 3;

  // Custom serialization for complex objects
  @override
  Map<String, dynamic> toJson() => {
    'userId': userId,
    'message': message,
    'type': type.name,
  };

  static SendNotificationJob fromJson(Map<String, dynamic> json) {
    return SendNotificationJob(
      json['userId'],
      json['message'],
      NotificationType.values.firstWhere((e) => e.name == json['type']),
    );
  }
}

enum NotificationType { email, sms, push }

QueueJob Properties

handle() - Required

The main method that contains your job logic. This is called when the worker processes the job.

displayName - Optional (defaults to class name)

Human-readable name for logging and debugging. Shown in worker callbacks and metrics.

maxRetries - Optional (defaults to 3)

Number of times to retry the job if it fails. Set to 0 to disable retries.

retryDelay - Optional (defaults to 30 seconds)

How long to wait before retrying a failed job. Useful for implementing exponential backoff.

shouldRetry - Optional (defaults to true)

Whether this job should be retried on failure. Set to false for one-shot jobs.

timeout - Optional (defaults to null/no timeout)

Maximum execution time. Job will be killed if it exceeds this duration.

queue - Optional (defaults to 'default')

Queue name for this job. Use different queues to prioritize different job types.

⚠️ Important: Job Design Principles

  • β€’ Idempotent: Jobs should produce the same result if run multiple times (in case of retry)
  • β€’ Self-Contained: Jobs should have all the data they need passed to the constructor
  • β€’ Fast Failing: Detect errors early and throw exceptions to trigger retries
  • β€’ Atomic: Either complete fully or fail fully - avoid partial states
  • β€’ Logged: Include logging to track progress and diagnose failures

Queue Drivers

Khadem supports multiple queue drivers, each optimized for different use cases. The driver determines how and where jobs are stored. You can easily switch drivers by changing configurationβ€”your job code stays exactly the same.

πŸ’Ύ Memory Driver

Stores jobs in memory for blazing-fast performance. Perfect for development and testing, but jobs are lost on application restart. Use this when you don't need persistence or are developing locally.

dart
// Import drivers
import 'package:khadem/src/core/queue/drivers/in_memory_driver.dart';

// Memory driver - fast but loses jobs on restart
// Using configuration file (config/queue.yaml or JSON)
final config = {
  'queue': {
    'default': 'memory',
    'drivers': {
      'memory': {
        'driver': 'memory',
        'track_metrics': true,
        'use_dlq': true,
        'max_retries': 3,
      },
    },
  },
};

// Or directly create and register driver
final driver = InMemoryDriver(
  config: DriverConfig(
    name: 'memory',
    trackMetrics: true,
    useDLQ: true,
  ),
);

final queueManager = QueueManager(config);
queueManager.registerDriver('memory', driver);

// Jobs are stored in memory only
await queueManager.dispatch(WelcomeEmailJob('user@example.com', 'John'));

// ⚠️  Jobs will be lost if application restarts
// βœ… Perfect for development and testing
// βœ… Fastest performance - O(1) operations
Use When: Development, testing, temporary jobs, high-performance scenarios where persistence isn't required

πŸ“ File Driver

Persists jobs to disk in JSON format. Jobs survive application restarts, making this suitable for production use on single servers. The driver handles file locking and atomic writes to prevent corruption.

dart
// Import file driver
import 'package:khadem/src/core/queue/drivers/file_storage_driver.dart';
import 'package:khadem/src/core/queue/registry/index.dart'; // For job registry

// File driver - persists jobs to disk
final config = {
  'queue': {
    'default': 'file',
    'drivers': {
      'file': {
        'driver': 'file',
        'path': './storage/queue',
        'track_metrics': true,
        'use_dlq': true,
      },
    },
  },
};

// Or directly create driver
final driver = FileStorageDriver(
  config: DriverConfig(name: 'file'),
  storagePath: './storage/queue',
);

final queueManager = QueueManager(config);
queueManager.loadFromConfig();

// Jobs are saved to file and survive restarts
await queueManager.dispatch(ImportantJob());

// File structure:
// storage/queue/main.json
// {
//   "jobs": [
//     {
//       "id": "123456789",
//       "type": "ImportantJob",
//       "payload": {...},
//       "scheduledFor": "2024-01-15T10:30:00.000Z",
//       "attempts": 0,
//       "status": "pending"
//     }
//   ]
// }
Use When: Production (single server), reliable persistence needed, simple deployments without external dependencies

⚑ Sync Driver

Executes jobs immediately in the current process without any queuing. Useful for testing or when you want queue-like APIs but need synchronous execution. This is essentially a no-op driver that just calls handle() immediately.

dart
// Import sync driver
import 'package:khadem/src/core/queue/drivers/synchronous_driver.dart';

// Sync driver - processes jobs immediately
final config = {
  'queue': {
    'default': 'sync',
    'drivers': {
      'sync': {
        'driver': 'sync',
      },
    },
  },
};

// Or directly create driver
final driver = SynchronousDriver(
  config: DriverConfig(name: 'sync'),
);

final queueManager = QueueManager(config);
queueManager.registerDriver('sync', driver);

// Jobs execute immediately (no queuing)
await queueManager.dispatch(QuickJob());
// Output: Job executed immediately!

// βœ… No background processing needed
// βœ… Perfect for simple synchronous operations
// βœ… Respects job delays with Future.delayed
// ❌ No actual queuing - jobs run in main thread
Use When: Testing, debugging, simple synchronous operations, or temporarily disabling background processing

πŸš€ Redis Driver

High-performance Redis-backed queue driver perfect for production environments with multiple workers or servers. Provides persistence, atomic operations, and excellent scalability. Requires Redis server (version 5.0+) running locally or remotely.

dart
// Import Redis driver
import 'package:khadem/src/core/queue/drivers/redis_storage_driver.dart';
import 'package:khadem/src/core/queue/registry/index.dart'; // For job registry

// Redis driver - high-performance persistent queue
final config = {
  'queue': {
    'default': 'redis',
    'drivers': {
      'redis': {
        'driver': 'redis',
        'host': 'localhost',
        'port': 6379,
        'password': null, // Optional password
        'queue_name': 'default',
        'track_metrics': true,
        'use_dlq': true,
      },
    },
  },
};

// Or directly create driver
final driver = RedisStorageDriver(
  config: DriverConfig(name: 'redis'),
  host: 'localhost',
  port: 6379,
  password: null, // Optional
  queueName: 'default',
);

final queueManager = QueueManager(config);
queueManager.registerDriver('redis', driver);

// Jobs are stored in Redis with atomic operations
await queueManager.dispatch(ImportantJob());

// βœ… Persistent storage (survives restarts)
// βœ… Atomic operations (no race conditions)
// βœ… Multi-worker support (horizontal scaling)
// βœ… Very fast (in-memory with optional persistence)
// ⚠️  Requires Redis server running

// Quick Redis setup with Docker:
// docker run -d -p 6379:6379 --name khadem-redis redis:alpine
Use When: High-scale production, multiple workers, distributed systems, need for persistence with high performance

πŸ“Š Driver Comparison

DriverPersistencePerformanceScalabilityUse Case
Memory❌ None⚑ FastestSingle ProcessDevelopment, Testing
Fileβœ… Disk🟒 FastSingle ServerProduction (Small-Medium)
SyncN/A⚑ ImmediateN/ATesting, Debugging
Redisβœ… In-Memory DBπŸš€ Very FastMulti-ServerProduction (High Scale)

πŸ’‘ Recommendation: Start with Memory for development and testing. Use File driver for production on single servers with moderate load. Switch to Redis when you need to scale horizontally with multiple worker servers or require high-performance persistent queuing with atomic operations.

Configuration

Queue configuration is flexible and supports multiple approaches. You can configure drivers programmatically by registering them, or use configuration files (JSON/YAML) that are loaded automatically. The QueueManager constructor accepts a ConfigInterface implementation that defines your queue settings.

dart
// Configuration file structure (config/queue.yaml or JSON)
final config = {
  'queue': {
    'default': 'memory', // Default driver to use
    'drivers': {
      'memory': {
        'driver': 'memory',
        'track_metrics': true,
        'use_dlq': true,
        'max_retries': 3,
      },
      'file': {
        'driver': 'file',
        'path': './storage/queue',
        'track_metrics': true,
        'use_dlq': true,
      },
      'sync': {
        'driver': 'sync',
      },
      'redis': {
        'driver': 'redis',
        'host': 'localhost',
        'port': 6379,
        'password': null,
        'queue_name': 'default',
        'track_metrics': true,
        'use_dlq': true,
      },
    },
  },
};

// Initialize with configuration
final queueManager = QueueManager(config);

// Load config and auto-register all drivers
queueManager.loadFromConfig();

// Or manually register drivers
final memoryDriver = InMemoryDriver(
  config: DriverConfig(
    name: 'memory',
    trackMetrics: true,
    useDLQ: true,
    maxRetries: 3,
    defaultJobTimeout: Duration(minutes: 5),
  ),
);
queueManager.registerDriver('memory', memoryDriver);

// Switch default driver
queueManager.setDefaultDriver('memory');

// Get a specific driver
final fileDriver = queueManager.getDriver('file');

// Check available drivers
print(queueManager.driverNames); // ['memory', 'file', 'sync', 'redis']
print(queueManager.hasDriver('redis')); // true

Configuration Options

  • β€’ default - Default driver name
  • β€’ drivers - Map of driver configurations
  • β€’ driver - Driver type (memory, file, sync, redis)
  • β€’ path - Storage path for file driver
  • β€’ host - Redis host for redis driver
  • β€’ port - Redis port for redis driver
  • β€’ track_metrics - Enable metrics tracking (default: true)
  • β€’ use_dlq - Enable dead letter queue (default: true)
  • β€’ max_retries - Maximum retry attempts (default: 3)
  • β€’ timeout - Job timeout in seconds (optional)

Delayed Jobs

Sometimes you need to schedule jobs for future execution rather than processing them immediately. Delayed jobs allow you to defer execution by a specific duration, making them perfect for scheduling tasks like sending reminder emails, processing scheduled reports, or implementing retry strategies with exponential backoff.

dart
// Immediate job execution
await queueManager.dispatch(UrgentJob());

// Delayed job execution
await queueManager.dispatch(
  ScheduledJob(),
  delay: Duration(minutes: 5),
);

// Various delay examples
final delays = [
  Duration(seconds: 30),     // 30 seconds
  Duration(minutes: 5),      // 5 minutes
  Duration(hours: 1),        // 1 hour
  Duration(days: 1),         // 1 day
  Duration(hours: 24),       // 24 hours
];

// Schedule newsletter
await queueManager.dispatch(
  SendNewsletterJob(),
  delay: Duration(hours: 9), // Send at 9 AM tomorrow
);

// Retry with backoff
class RetryWithBackoffJob extends QueueJob {
  final int attempt;

  RetryWithBackoffJob([this.attempt = 0]);

  @override
  Future<void> handle() async {
    try {
      await riskyOperation();
      print('βœ… Operation succeeded on attempt ${attempt + 1}');
    } catch (e) {
      if (attempt < maxRetries) {
        // Exponential backoff: 30s, 1m, 2m, 4m, 8m...
        final delay = Duration(seconds: 30 * (1 << attempt));
        await queueManager.dispatch(
          RetryWithBackoffJob(attempt + 1),
          delay: delay,
        );
        print('⏰ Retrying in ${delay.inSeconds} seconds...');
      } else {
        print('❌ Operation failed permanently after ${attempt + 1} attempts');
      }
    }
  }

  @override
  String get displayName => 'Retry Operation (Attempt ${attempt + 1})';

  @override
  int get maxRetries => 5;
}

⏰ Common Delay Patterns

  • β€’ Duration(seconds: 30) - Quick retry delays
  • β€’ Duration(minutes: 5) - Short-term scheduling
  • β€’ Duration(hours: 1) - Hourly tasks
  • β€’ Duration(days: 1) - Daily reports/cleanup
  • β€’ null - Immediate processing (default)

πŸ’‘ Use Cases

  • β€’ Reminder Emails: Send 24h before event
  • β€’ Retry Logic: Exponential backoff delays
  • β€’ Scheduled Reports: Generate at specific times
  • β€’ Rate Limiting: Spread API calls over time
  • β€’ Trial Expiry: Check status after trial period

How Delays Work: When you dispatch a job with a delay, the queue stores the job along with its scheduled execution time. Workers check for jobs that are ready to execute (where the scheduled time has passed) before processing them. This ensures jobs run at the right time even if workers restart.

Queue Workers

Workers are background processes that continuously poll the queue and execute jobs. They run in a loop, checking for pending jobs, executing them, handling errors, and managing retries. A well-configured worker is essential for a healthy queue system, balancing throughput with resource usage while providing visibility into job execution.

dart
// Basic worker
await queueManager.startWorker();

// Worker with custom configuration
await queueManager.startWorker(
  maxJobs: 100,                    // Process max 100 jobs
  delay: Duration(seconds: 5),     // 5 second delay between jobs
  timeout: Duration(hours: 1),     // Stop after 1 hour
  runInBackground: true,           // Run in background
);

// Worker with event callbacks
await queueManager.startWorker(
  onJobStart: (job) {
    print('πŸš€ Starting job: ${job.displayName}');
  },
  onJobComplete: (job, result) {
    print('βœ… Completed job: ${job.displayName}');
  },
  onJobError: (job, error, stack) {
    print('❌ Failed job: ${job.displayName} - $error');
    // Send alert, log to external service, etc.
  },
  onError: (error, stack) {
    print('πŸ’₯ Worker error: $error');
    // Handle worker-level errors
  },
);

// Long-running worker for production
await queueManager.startWorker(
  delay: Duration(milliseconds: 500),  // Fast processing
  maxJobs: null,                       // Run indefinitely
  timeout: null,                       // No timeout
  runInBackground: true,
);

// Development worker with verbose logging
await queueManager.startWorker(
  delay: Duration(seconds: 2),
  onJobStart: (job) => print('[DEV] Processing: ${job.displayName}'),
  onJobComplete: (job, result) => print('[DEV] Completed: ${job.displayName}'),
  onJobError: (job, error, stack) => print('[DEV] Error in ${job.displayName}: $error'),
);

βš™οΈ Configuration Options

  • β€’ maxJobs - Limit jobs before stopping (null = infinite)
  • β€’ delay - Pause between jobs (prevents CPU overload)
  • β€’ timeout - Maximum worker runtime (null = no limit)
  • β€’ runInBackground - Non-blocking execution
  • β€’ onJobStart - Called when job begins
  • β€’ onJobComplete - Called on successful completion
  • β€’ onJobError - Called when job fails
  • β€’ onError - Called for worker-level errors

🎯 Worker Strategies

  • β€’ Development: 2s delay, verbose callbacks
  • β€’ Production: 500ms delay, monitoring hooks
  • β€’ Batch Processing: No delay, max jobs limit
  • β€’ Long-Running: Background, no timeout
  • β€’ Testing: Sync execution, immediate feedback

Worker Lifecycle: Workers continuously check the queue β†’ fetch next job β†’ execute job's handle() method β†’ handle success/failure β†’ apply delay β†’ repeat. If a job fails and has remaining retries, the worker automatically requeues it. Workers gracefully shut down when reaching maxJobs, hitting the timeout, or encountering critical errors.

⚠️ Important Notes

  • β€’ Always run at least one worker in production, or jobs will never execute
  • β€’ Use runInBackground: true for production to avoid blocking
  • β€’ Set appropriate delay to balance throughput and CPU usage
  • β€’ Implement onJobError for monitoring and alerting
  • β€’ For critical jobs, consider running multiple workers for redundancy

Batch Processing

Batch processing allows you to queue multiple jobs at once or create single jobs that process multiple items. This approach is more efficient than creating individual jobs for each item, reducing overhead and improving throughput. You can either dispatch multiple jobs in one operation or design jobs that handle multiple items internally.

dart
// Process multiple jobs at once
final jobs = [
  SendEmailJob('user1@example.com', 'Welcome User 1'),
  SendEmailJob('user2@example.com', 'Welcome User 2'),
  SendEmailJob('user3@example.com', 'Welcome User 3'),
  ProcessPaymentJob('order_123', 99.99),
  UpdateInventoryJob('product_456', -5),
];

// Dispatch all at once
await queueManager.dispatchBatch(jobs);

// Dispatch with individual delays
await queueManager.dispatchBatch(jobs, delay: Duration(seconds: 10));

// Batch processing job
class BulkEmailJob extends QueueJob {
  final List<String> emails;
  final String subject;
  final String template;

  BulkEmailJob(this.emails, this.subject, this.template);

  @override
  Future<void> handle() async {
    print('Sending ${emails.length} emails...');

    for (final email in emails) {
      try {
        final personalizedContent = template.replaceAll('{{email}}', email);
        await sendEmail(email, subject, personalizedContent);
        print('βœ… Sent to $email');
      } catch (e) {
        print('❌ Failed to send to $email: $e');
        // Continue with other emails
      }
    }

    print('βœ… Bulk email job completed');
  }

  @override
  String get displayName => 'Send Bulk Email (${emails.length} recipients)';

  @override
  int get maxRetries => 2;
}

// Usage
await queueManager.dispatch(BulkEmailJob(
  ['user1@example.com', 'user2@example.com', 'user3@example.com'],
  'Monthly Newsletter',
  'Hello {{email}}, here is your newsletter...',
));

✨ Batch Benefits

  • β€’ Efficiency: Fewer queue operations
  • β€’ Resource Sharing: Reuse connections/contexts
  • β€’ Reduced Overhead: Less serialization/deserialization
  • β€’ Better Throughput: Process more items faster
  • β€’ Atomicity: Related operations stay together

πŸ“‹ When to Use Batching

  • β€’ Sending emails to multiple recipients
  • β€’ Importing data from CSV/Excel files
  • β€’ Bulk updating database records
  • β€’ Processing multiple payment transactions
  • β€’ Generating multiple reports at once

Choosing an Approach: Use dispatchBatch() when you have multiple independent jobs of different types. Use a single bulk job (like BulkEmailJob) when processing many similar items that can share resources (database connections, API clients). Both approaches work, but bulk jobs are generally more efficient for large datasets.

πŸ’‘ Best Practices

  • Chunk Large Batches: Split 10,000 emails into 10 jobs of 1,000 emails each rather than one massive job
  • Handle Partial Failures: Use try-catch within loops so one failure doesn't stop the entire batch
  • Progress Tracking: Log progress regularly in batch jobs for visibility
  • Resource Limits: Be mindful of memory when processing large batches

Monitoring and Metrics

Understanding your queue's health and performance is crucial for production systems. The queue system tracks comprehensive metrics about job execution, success rates, and processing times. These metrics help you identify bottlenecks, detect failures, and optimize performance. Regular monitoring ensures your background jobs run smoothly and alerts you to problems before they impact users.

dart
// Import metrics (if accessing directly)
import 'package:khadem/src/core/queue/metrics/index.dart';

// Get driver statistics
final stats = await queueManager.driver.getStats();
print('Queue Statistics:');
print('  Driver: ${stats['driver_name']}');
print('  Queue Depth: ${stats['queue_depth']}');
print('  Processing: ${stats['processing_count']}');
print('  Pending: ${stats['pending_count']}');
print('  Ready Jobs: ${stats['ready_jobs']}');
print('  Delayed Jobs: ${stats['delayed_jobs']}');

// Metrics tracking (if enabled)
if (stats.containsKey('metrics')) {
  final metrics = stats['metrics'] as Map<String, dynamic>;
  print('Metrics:');
  print('  Total Queued: ${metrics['jobs_queued']}');
  print('  Total Completed: ${metrics['jobs_completed']}');
  print('  Total Failed: ${metrics['jobs_failed']}');
  print('  Success Rate: ${(metrics['success_rate'] * 100).toStringAsFixed(1)}%');
  
  // Jobs by type
  final byType = metrics['jobs_by_type'] as Map<String, dynamic>;
  print('Jobs by Type:');
  byType.forEach((type, count) {
    print('  $type: $count');
  });
}

// Dead Letter Queue statistics (if enabled)
if (stats.containsKey('dlq')) {
  final dlq = stats['dlq'] as Map<String, dynamic>;
  print('Dead Letter Queue:');
  print('  Total: ${dlq['total_jobs']}');
  print('  By Job Type:');
  (dlq['jobs_by_type'] as Map<String, dynamic>).forEach((type, count) {
    print('    $type: $count');
  });
}

// Custom monitoring with periodic checks
class QueueMonitor {
  final QueueManager _queueManager;

  QueueMonitor(this._queueManager);

  void startMonitoring() {
    Timer.periodic(Duration(minutes: 5), (timer) async {
      final stats = await _queueManager.driver.getStats();
      final metrics = stats['metrics'] as Map<String, dynamic>?;

      if (metrics == null) {
        print('⚠️ Metrics not enabled');
        return;
      }

      // Check for issues
      final successRate = metrics['success_rate'] as double;
      if (successRate < 0.9) { // < 90% success rate
        print('⚠️ Low success rate: ${(successRate * 100).toStringAsFixed(1)}%');
        // Send alert
      }

      final queueDepth = stats['queue_depth'] as int;
      if (queueDepth > 100) {
        print('⚠️ Large queue backlog: $queueDepth jobs');
        // Scale up workers
      }

      // Log metrics
      print('πŸ“Š Queue Health Check:');
      print('   Success Rate: ${(successRate * 100).toStringAsFixed(1)}%');
      print('   Processing: ${stats['processing_count']} jobs');
      print('   Queue Depth: $queueDepth jobs');
    });
  }
}

// Reset metrics (for testing)
// Note: This requires direct access to driver's metrics object
import 'package:khadem/src/core/queue/drivers/base_driver.dart';

final driver = queueManager.driver as BaseQueueDriver;
if (driver.metrics != null) {
  driver.metrics!.reset();
}

πŸ“Š Available Metrics

  • β€’ driver_name - Name of the queue driver
  • β€’ queue_depth - Total jobs in queue
  • β€’ processing_count - Jobs currently being processed
  • β€’ pending_count - Jobs waiting to be processed
  • β€’ ready_jobs - Jobs ready for immediate execution
  • β€’ delayed_jobs - Jobs scheduled for later
  • β€’ jobs_queued - Total jobs ever queued
  • β€’ jobs_completed - Total successful jobs
  • β€’ jobs_failed - Total failed jobs
  • β€’ success_rate - Success percentage (0.0-1.0)
  • β€’ jobs_by_type - Map of job counts by type

🚨 What to Monitor

  • β€’ Low Success Rate: < 90% indicates issues
  • β€’ Growing Queue: Jobs queuing faster than processing
  • β€’ Large Queue Depth: Over 100 jobs backed up
  • β€’ Stuck Jobs: processing_count not changing
  • β€’ No Activity: Zero completed jobs (worker down?)
  • β€’ Dead Letter Queue: Check for permanently failed jobs

Monitoring Strategy: Set up periodic checks (every 5-15 minutes) to review statistics from driver.getStats(). Look for anomalies like sudden spikes in failures, increasing queue depth, or low success rates. Integrate with alerting systems (email, Slack, PagerDuty) to notify you of critical issues. Track trends over time to identify patterns and plan capacity upgrades.

πŸ’‘ Monitoring Tips

  • β€’ Use Timer.periodic for automated health checks
  • β€’ Access metrics via await queueManager.driver.getStats()
  • β€’ Set up alerts for success rates below acceptable thresholds
  • β€’ Export metrics to external systems (Prometheus, Datadog, CloudWatch)
  • β€’ Create dashboards visualizing key metrics over time
  • β€’ Enable track_metrics in driver configuration
  • β€’ Monitor dead letter queue for permanently failed jobs

Error Handling and Retries

Jobs can fail for many reasons: network timeouts, external service outages, temporary resource constraints, or unexpected data. A robust queue system must handle failures gracefully, retry when appropriate, and prevent cascading failures. Khadem's queue provides automatic retry logic with configurable limits, comprehensive error tracking, and multiple levels of error handling to keep your system resilient.

dart
// Job with error handling
class ProcessOrderJob extends QueueJob {
  final String orderId;

  ProcessOrderJob(this.orderId);

  @override
  Future<void> handle() async {
    try {
      // Step 1: Validate order
      final order = await Order.find(orderId);
      if (order == null) {
        throw Exception('Order not found: $orderId');
      }

      // Step 2: Process payment
      await PaymentService.charge(order);

      // Step 3: Update inventory
      for (final item in order.items) {
        await Inventory.decrement(item.productId, item.quantity);
      }

      // Step 4: Send confirmation
      await EmailService.sendOrderConfirmation(order);

      print('βœ… Order $orderId processed successfully');

    } catch (e) {
      print('❌ Failed to process order $orderId: $e');

      // Log detailed error information
      await ErrorLogger.log('order_processing_failed', {
        'orderId': orderId,
        'error': e.toString(),
        'timestamp': DateTime.now().toIso8601String(),
        'attempt': attempt,
      });

      // Don't rethrow - let queue handle retry logic
      // The job will be retried based on maxRetries
    }
  }

  @override
  String get displayName => 'Process Order $orderId';

  @override
  int get maxRetries => 3;
}

// Worker with error handling
await queueManager.startWorker(
  onJobError: (job, error, stack) async {
    // Log to external service
    await ExternalLogger.error('Queue job failed', {
      'job': job.displayName,
      'error': error.toString(),
      'stack': stack.toString(),
      'timestamp': DateTime.now().toIso8601String(),
    });

    // Send alert for critical jobs
    if (job.displayName.contains('Payment') || job.displayName.contains('Order')) {
      await AlertService.send('Critical job failed: ${job.displayName}');
    }
  },
  onError: (error, stack) async {
    // Worker-level error
    await ExternalLogger.critical('Queue worker error', {
      'error': error.toString(),
      'stack': stack.toString(),
    });

    // Restart worker or send alert
    await AlertService.send('Queue worker crashed - manual intervention required');
  },
);

// Graceful error recovery
class ResilientJob extends QueueJob {
  @override
  Future<void> handle() async {
    const maxRetries = 3;
    var attempt = 0;

    while (attempt < maxRetries) {
      try {
        await performRiskyOperation();
        print('βœ… Operation succeeded');
        return;
      } catch (e) {
        attempt++;
        print('⚠️  Attempt $attempt failed: $e');

        if (attempt < maxRetries) {
          // Wait before retry
          await Future.delayed(Duration(seconds: attempt * 5));
        }
      }
    }

    throw Exception('Operation failed after $maxRetries attempts');
  }

  @override
  String get displayName => 'Resilient Operation';

  @override
  int get maxRetries => 0; // Handle retries internally
}

πŸ”„ Automatic Retry Behavior

  • β€’ Jobs retry automatically when they throw exceptions
  • β€’ Each job defines maxRetries (default: 3)
  • β€’ Retry count tracked across worker restarts (persistent drivers)
  • β€’ Failed jobs logged but don't crash the worker
  • β€’ After max retries, job moves to failed state
  • β€’ Exponential backoff recommended for retries

🎯 Error Handling Levels

  • Job Level: Try-catch in handle() method
  • Worker Level: onJobError callback
  • System Level: onError for worker crashes
  • Monitoring: External logging/alerting systems

Error Handling Flow: When a job throws an exception during handle(), the worker catches it, increments the retry counter, and checks if retries remain. If yes, the job is requeued for another attempt. If maxRetries is exhausted, the job enters the failed state and the onJobError callback is invoked. The worker continues processing other jobsβ€”one failed job doesn't stop the entire queue.

⚠️ Error Handling Best Practices

  • Idempotent Jobs: Design jobs that can be safely retried without side effects
  • Explicit Logging: Log errors with context (job ID, attempt number, payload)
  • Graceful Degradation: Catch exceptions and handle them appropriately
  • External Monitoring: Send critical failures to alerting systems
  • Set Appropriate Limits: Don't retry foreverβ€”know when to give up
  • Distinguish Errors: Permanent failures (bad data) vs temporary (network timeout)

βœ… When to Retry vs When to Fail

βœ… Safe to Retry:

  • β€’ Network timeouts or connection errors
  • β€’ External service temporarily unavailable (503 errors)
  • β€’ Rate limit exceeded (429 errors)
  • β€’ Database deadlocks or lock timeouts
  • β€’ Temporary resource constraints (memory, file handles)

❌ Don't Retry:

  • β€’ Invalid data or validation errors (400, 422)
  • β€’ Authentication failures (401, 403)
  • β€’ Resource not found (404)
  • β€’ Business logic violations
  • β€’ Permanent external service errors (410 Gone)

Real-World Examples

Let's explore practical examples of how to use the queue system in real applications. These examples demonstrate common patterns like email processing, data imports, scheduled tasks, and more. Each example shows complete job implementation with proper error handling, logging, and best practices.

πŸ“§

Email Processing

Send welcome emails to new users and newsletters to subscribers. Emails are processed in the background, so user registration is fast. Failed emails are retried automatically, and the system tracks delivery status.

dart
// Email processing job
class SendWelcomeEmailJob extends QueueJob {
  final User user;

  SendWelcomeEmailJob(this.user);

  @override
  Future<void> handle() async {
    try {
      // Personalize email content
      final subject = 'Welcome to Our Platform, ${user.firstName}!';
      final body = '''
        Hi ${user.firstName},

        Welcome to our platform! We're excited to have you here.

        Your account details:
        - Email: ${user.email}
        - Registration Date: ${user.createdAt}

        Best regards,
        The Team
      ''';

      // Send email
      await EmailService.send(
        to: user.email,
        subject: subject,
        body: body,
        html: generateWelcomeHtml(user),
      );

      // Update user status
      await user.update({'welcome_email_sent': true});

      print('βœ… Welcome email sent to ${user.email}');

    } catch (e) {
      print('❌ Failed to send welcome email to ${user.email}: $e');
      throw e; // Let queue handle retry
    }
  }

  @override
  String get displayName => 'Send Welcome Email to ${user.email}';

  @override
  int get maxRetries => 3;
}

// Bulk email job
class SendNewsletterJob extends QueueJob {
  final List<User> subscribers;
  final Newsletter newsletter;

  SendNewsletterJob(this.subscribers, this.newsletter);

  @override
  Future<void> handle() async {
    var successCount = 0;
    var failureCount = 0;

    for (final user in subscribers) {
      try {
        await EmailService.sendNewsletter(user, newsletter);
        successCount++;
        print('βœ… Newsletter sent to ${user.email}');
      } catch (e) {
        failureCount++;
        print('❌ Failed to send newsletter to ${user.email}: $e');
      }

      // Small delay to avoid overwhelming email service
      await Future.delayed(Duration(milliseconds: 100));
    }

    print('πŸ“Š Newsletter sent: $successCount success, $failureCount failed');

    // Log completion
    await Analytics.track('newsletter_sent', {
      'newsletter_id': newsletter.id,
      'total_sent': successCount,
      'total_failed': failureCount,
    });
  }

  @override
  String get displayName => 'Send Newsletter: ${newsletter.title}';

  @override
  int get maxRetries => 2;
}
πŸ“Š

Data Processing

Import large CSV files or process bulk data updates. These operations can take minutes or hours, so running them in the background prevents blocking the main application. Progress is tracked and logged, with partial failure handling to ensure maximum data is processed.

dart
// Data import job
class ImportUsersJob extends QueueJob {
  final String csvFilePath;

  ImportUsersJob(this.csvFilePath);

  @override
  Future<void> handle() async {
    final file = File(csvFilePath);
    final lines = await file.readAsLines();

    var imported = 0;
    var skipped = 0;
    var errors = 0;

    // Skip header
    for (var i = 1; i < lines.length; i++) {
      final line = lines[i];
      try {
        final userData = parseCsvLine(line);

        // Validate data
        if (!isValidUserData(userData)) {
          skipped++;
          continue;
        }

        // Create user
        await User.create(userData);
        imported++;

        // Progress logging
        if (imported % 100 == 0) {
          print('πŸ“Š Imported $imported users...');
        }

      } catch (e) {
        errors++;
        print('❌ Error importing line ${i + 1}: $e');
      }
    }

    print('βœ… Import completed: $imported imported, $skipped skipped, $errors errors');

    // Cleanup
    await file.delete();

    // Send completion notification
    await NotificationService.send('Data import completed', {
      'file': csvFilePath,
      'imported': imported,
      'skipped': skipped,
      'errors': errors,
    });
  }

  @override
  String get displayName => 'Import Users from CSV';

  @override
  int get maxRetries => 1;
}

// Data cleanup job
class CleanupOldDataJob extends QueueJob {
  final Duration olderThan;

  CleanupOldDataJob(this.olderThan);

  @override
  Future<void> handle() async {
    final cutoffDate = DateTime.now().subtract(olderThan);

    // Clean up old logs
    final deletedLogs = await Database.table('logs')
      .where('created_at', '<', cutoffDate)
      .delete();

    // Clean up old sessions
    final deletedSessions = await Database.table('sessions')
      .where('last_activity', '<', cutoffDate)
      .delete();

    // Clean up old notifications
    final deletedNotifications = await Database.table('notifications')
      .where('created_at', '<', cutoffDate)
      .where('read', true)
      .delete();

    print('🧹 Cleanup completed:');
    print('   Logs deleted: $deletedLogs');
    print('   Sessions deleted: $deletedSessions');
    print('   Notifications deleted: $deletedNotifications');

    // Log cleanup operation
    await AuditLogger.log('data_cleanup', {
      'older_than_days': olderThan.inDays,
      'logs_deleted': deletedLogs,
      'sessions_deleted': deletedSessions,
      'notifications_deleted': deletedNotifications,
    });
  }

  @override
  String get displayName => 'Cleanup Data Older Than ${olderThan.inDays} Days';

  @override
  int get maxRetries => 2;
}
⏰

Scheduled Tasks

Schedule tasks for future execution like reminder emails, report generation, or cleanup operations. Use delayed jobs to run tasks at specific times. These examples show how to handle recurring tasks and time-based operations.

dart
// Daily report job
class GenerateDailyReportJob extends QueueJob {
  @override
  Future<void> handle() async {
    final yesterday = DateTime.now().subtract(Duration(days: 1));
    final reportData = await generateReport(yesterday);

    // Save report to database
    final report = await Report.create({
      'date': yesterday,
      'data': reportData,
      'generated_at': DateTime.now(),
    });

    // Send report via email
    await EmailService.sendReport(report);

    // Archive old reports (older than 30 days)
    await Report.where('date', '<', DateTime.now().subtract(Duration(days: 30)))
      .update({'archived': true});

    print('βœ… Daily report generated for ${yesterday.toDateString()}');
  }

  @override
  String get displayName => 'Generate Daily Report';

  @override
  int get maxRetries => 3;
}

// Schedule daily reports
class ReportScheduler {
  final QueueManager _queueManager;

  ReportScheduler(this._queueManager);

  void scheduleDailyReports() {
    // Schedule first report for tomorrow at 9 AM
    final tomorrow9AM = DateTime.now()
      .add(Duration(days: 1))
      .copyWith(hour: 9, minute: 0, second: 0);

    final initialDelay = tomorrow9AM.difference(DateTime.now());

    _queueManager.dispatch(
      GenerateDailyReportJob(),
      delay: initialDelay,
    );

    // Schedule recurring reports (this would be handled by a cron-like system)
    // For now, each report schedules the next one
  }
}

// Maintenance job
class DatabaseMaintenanceJob extends QueueJob {
  @override
  Future<void> handle() async {
    print('πŸ”§ Starting database maintenance...');

    // Optimize tables
    await Database.optimizeTables();

    // Update statistics
    await Database.analyzeTables();

    // Clean up orphaned records
    await Database.cleanupOrphanedRecords();

    // Rebuild indexes
    await Database.rebuildIndexes();

    print('βœ… Database maintenance completed');

    // Log maintenance completion
    await SystemLogger.info('Database maintenance completed', {
      'timestamp': DateTime.now().toIso8601String(),
      'duration': 'completed',
    });
  }

  @override
  String get displayName => 'Database Maintenance';

  @override
  int get maxRetries => 2;
}

// Schedule maintenance weekly
final maintenanceDelay = Duration(days: 7);
await queueManager.dispatch(
  DatabaseMaintenanceJob(),
  delay: maintenanceDelay,
);

πŸ’‘ Key Patterns in These Examples

  • β€’ Progress Tracking: Log progress throughout long-running jobs
  • β€’ Partial Failure Handling: One failure doesn't stop the entire batch
  • β€’ Resource Cleanup: Properly close connections and free resources
  • β€’ State Updates: Update database records to track job completion
  • β€’ Error Context: Log enough information to debug failures later
  • β€’ Retry Safety: Jobs are idempotent and can be retried safely

Custom Drivers

While Khadem provides built-in drivers for most use cases, you can create custom queue drivers for specialized requirements. Custom drivers are useful for integrating with proprietary queue systems, implementing unique storage strategies, or adding specialized features like priority queues, job deduplication, or custom scheduling logic.

dart
// Import required contracts and base classes
import 'package:khadem/src/contracts/queue/queue_driver.dart';
import 'package:khadem/src/contracts/queue/queue_job.dart';
import 'package:khadem/src/core/queue/drivers/base_driver.dart';

// Custom database queue driver
class DatabaseQueueDriver extends BaseQueueDriver {
  final DatabaseConnection _db;

  DatabaseQueueDriver(this._db);

  @override
  Future<void> push(QueueJob job, {Duration? delay}) async {
    final scheduledAt = DateTime.now().add(delay ?? Duration.zero);

    await _db.table('jobs').insert({
      'id': DateTime.now().millisecondsSinceEpoch.toString(),
      'type': job.runtimeType.toString(),
      'payload': jsonEncode(job.toJson()),
      'scheduled_at': scheduledAt.toIso8601String(),
      'created_at': DateTime.now().toIso8601String(),
      'attempts': 0,
      'max_retries': job.maxRetries,
      'status': 'pending',
    });

    print('πŸ—„οΈ Job queued to database: ${job.displayName}');
  }

  @override
  Future<void> process() async {
    // Find next pending job
    final jobRecord = await _db.table('jobs')
      .where('status', 'pending')
      .where('scheduled_at', '<=', DateTime.now().toIso8601String())
      .orderBy('created_at')
      .first();

    if (jobRecord == null) return;

    // Mark as processing
    await _db.table('jobs')
      .where('id', jobRecord['id'])
      .update({'status': 'processing'});

    try {
      // Create job instance and execute
      final job = _createJobFromRecord(jobRecord);
      await job.handle();

      // Mark as completed
      await _db.table('jobs')
        .where('id', jobRecord['id'])
        .update({'status': 'completed'});

      print('πŸ—„οΈ Job completed: ${job.displayName}');

    } catch (e) {
      // Handle failure and retry logic
      final attempts = jobRecord['attempts'] + 1;
      final maxRetries = jobRecord['max_retries'];

      if (attempts >= maxRetries) {
        await _db.table('jobs')
          .where('id', jobRecord['id'])
          .update({
            'status': 'failed',
            'error': e.toString(),
          });
        print('πŸ—„οΈ Job failed permanently: ${jobRecord['type']}');
      } else {
        // Schedule retry
        final retryDelay = Duration(seconds: 30 * attempts);
        final nextAttempt = DateTime.now().add(retryDelay);

        await _db.table('jobs')
          .where('id', jobRecord['id'])
          .update({
            'status': 'pending',
            'attempts': attempts,
            'scheduled_at': nextAttempt.toIso8601String(),
          });
        print('πŸ—„οΈ Job retry scheduled: ${jobRecord['type']}');
      }
    }
  }

  QueueJob _createJobFromRecord(Map<String, dynamic> record) {
    // This would need a job registry/factory
    // Simplified version for demonstration
    final payload = jsonDecode(record['payload']);
    return GenericQueueJob.fromPayload(record['type'], payload);
  }
}

// Register custom driver
QueueFactory.registerDriver('database', DatabaseQueueDriver(database));

// Use custom driver
final config = {
  'queue': {
    'driver': 'database',
  },
};
final queueManager = QueueManager(config);

πŸ”§ Required Methods

  • β€’ push(QueueJob job, Duration? delay) - Add job to queue
  • β€’ process() - Process next available job
  • β€’ clear() - Remove all jobs (optional)
  • β€’ size() - Get queue size (optional)

βœ… Implementation Checklist

  • β€’ Handle delayed job execution correctly
  • β€’ Implement proper error handling
  • β€’ Support job serialization (if persistent)
  • β€’ Track retry counts and max retries
  • β€’ Ensure thread safety (if needed)
  • β€’ Add metrics and monitoring hooks

When to Create a Custom Driver: Consider a custom driver if you need to integrate with Amazon SQS, RabbitMQ, or other message queue systems, implement custom storage (like PostgreSQL's LISTEN/NOTIFY), add advanced features like priority levels or job dependencies, or optimize for specific performance requirements. For most applications, the built-in drivers are sufficient.

⚠️ Custom Driver Considerations

  • β€’ Thoroughly test edge cases (empty queue, delayed jobs, failures)
  • β€’ Document any limitations or special requirements
  • β€’ Consider job serialization carefully for persistent drivers
  • β€’ Implement proper cleanup in dispose() method
  • β€’ Add comprehensive logging for debugging
  • β€’ Register your driver with QueueFactory.registerDriver()

Best Practices

βœ… Do's

  • β€’ Use appropriate drivers for your environment (memory for dev, file/redis for prod)
  • β€’ Set reasonable retry limits to prevent infinite retry loops
  • β€’ Monitor queue metrics and set up alerts for failures
  • β€’ Use delayed jobs for scheduled tasks and background processing
  • β€’ Handle errors gracefully in job implementations
  • β€’ Use batch processing for bulk operations
  • β€’ Configure worker timeouts to prevent hanging jobs
  • β€’ Log important job events for debugging
  • β€’ Test job implementations thoroughly
  • β€’ Use descriptive job names and proper error messages

❌ Don'ts

  • β€’ Don't perform long-running operations without proper timeouts
  • β€’ Don't rely on memory driver for production (jobs lost on restart)
  • β€’ Don't ignore job failures - always handle errors appropriately
  • β€’ Don't create jobs with side effects that can't be retried safely
  • β€’ Don't use synchronous driver for background processing
  • β€’ Don't forget to start queue workers in production
  • β€’ Don't use very short delays that could overwhelm the system
  • β€’ Don't log sensitive information in job error messages
  • β€’ Don't create jobs that depend on external state that may change
  • β€’ Don't forget to monitor queue health and performance

Performance Considerations

Optimization Tips

  • β€’ Memory driver is fastest but loses jobs on restart
  • β€’ File driver provides persistence with reasonable performance
  • β€’ Use appropriate worker delays to balance throughput and resource usage
  • β€’ Configure proper timeouts to prevent hanging jobs
  • β€’ Monitor memory usage with large job payloads
  • β€’ Consider job size limits for very large payloads

Resource Management

  • β€’ Set reasonable worker limits to prevent resource exhaustion
  • β€’ Use background workers for long-running applications
  • β€’ Configure proper error handling to prevent worker crashes
  • β€’ Monitor CPU and memory usage of queue workers
  • β€’ Use connection pooling for database/file operations

Testing Queue Jobs

Testing queue jobs is crucial for ensuring reliability in production. Unlike synchronous code, queue jobs run asynchronously, handle retries, and interact with external systems. Effective testing verifies job logic, error handling, retry behavior, and integration with the queue system. Use the sync driver for testing to get immediate feedback without background workers.

dart
// Import test framework and queue classes
import 'package:test/test.dart';
import 'package:khadem/src/core/queue/index.dart';
import 'package:khadem/src/contracts/queue/queue_job.dart';

// Unit tests for queue jobs
void main() {
  group('QueueJob Tests', () {
    late QueueManager queueManager;
    late InMemoryDriver driver;

    setUp(() {
      // Use memory driver for testing
      driver = InMemoryDriver(
        config: DriverConfig(name: 'test'),
      );
      
      final config = Config(); // Mock config
      queueManager = QueueManager(config, driver: driver);
    });

    test('Simple job executes successfully', () async {
      var executed = false;
      
      final job = SimpleJob(() => executed = true);
      await queueManager.dispatch(job);

      // Process the job
      await queueManager.process();

      expect(executed, isTrue);
      expect(driver.isEmpty, isTrue);
    });

    test('Job with retry logic', () async {
      var executionCount = 0;
      
      final job = RetryableJob(() {
        executionCount++;
        if (executionCount < 3) throw Exception('Retry me');
      });
      
      await queueManager.dispatch(job);

      // Process multiple times - automatic retry happens
      await queueManager.process(); // Fail 1, auto-retry
      await Future.delayed(Duration(milliseconds: 100));
      await queueManager.process(); // Fail 2, auto-retry
      await Future.delayed(Duration(milliseconds: 100));
      await queueManager.process(); // Success

      expect(executionCount, equals(3));
    });

    test('Delayed job execution', () async {
      var executed = false;
      final job = SimpleJob(() => executed = true);
      final delay = Duration(milliseconds: 500);

      await queueManager.dispatch(job, delay: delay);

      // Job should not execute immediately
      await queueManager.process();
      expect(executed, isFalse);

      // Wait for delay and process again
      await Future.delayed(delay + Duration(milliseconds: 100));
      await queueManager.process();
      expect(executed, isTrue);
    });

    test('Batch job processing', () async {
      final executed = <int>[];
      final jobs = List.generate(
        3, 
        (i) => SimpleJob(() => executed.add(i)),
      );

      await queueManager.dispatchBatch(jobs);

      // Process all jobs
      for (var i = 0; i < jobs.length; i++) {
        await queueManager.process();
      }

      expect(executed.length, equals(3));
      expect(driver.isEmpty, isTrue);
    });

    test('Worker processes jobs correctly', () async {
      final executed = <int>[];
      final jobs = List.generate(
        5, 
        (i) => SimpleJob(() => executed.add(i)),
      );
      
      for (final job in jobs) {
        await queueManager.dispatch(job);
      }

      // Start worker with short delay
      await queueManager.startWorker(
        maxJobs: 5,
        delay: Duration(milliseconds: 10),
      );

      // Wait for all jobs to complete
      await Future.delayed(Duration(milliseconds: 200));

      expect(executed.length, equals(5));
      expect(driver.isEmpty, isTrue);
    });

    test('Queue statistics', () async {
      final job = SimpleJob(() {});
      await queueManager.dispatch(job);

      final stats = await driver.getStats();
      
      expect(stats['queue_depth'], equals(1));
      expect(stats['ready_jobs'], equals(1));
      expect(stats['delayed_jobs'], equals(0));
    });

    test('Metrics tracking', () async {
      // Enable metrics
      final metricsDriver = InMemoryDriver(
        config: DriverConfig(
          name: 'test',
          trackMetrics: true,
        ),
        metrics: QueueMetrics(),
      );
      
      final qm = QueueManager(Config(), driver: metricsDriver);

      await qm.dispatch(SimpleJob(() {}));
      await qm.process();

      final stats = await metricsDriver.getStats();
      final metrics = stats['metrics'] as Map<String, dynamic>;
      
      expect(metrics['jobs_queued'], equals(1));
      expect(metrics['jobs_completed'], equals(1));
      expect(metrics['success_rate'], equals(1.0));
    });
  });
}

// Test job implementations
class SimpleJob extends QueueJob {
  final void Function() action;

  SimpleJob(this.action);

  @override
  Future<void> handle() async {
    await Future.delayed(Duration(milliseconds: 10)); // Simulate work
    action();
  }

  @override
  String get displayName => 'Simple Test Job';

  @override
  int get maxRetries => 3;
}

class RetryableJob extends QueueJob {
  final void Function() action;

  RetryableJob(this.action);

  @override
  Future<void> handle() async {
    await Future.delayed(Duration(milliseconds: 10));
    action(); // May throw exception
  }

  @override
  String get displayName => 'Retryable Job';

  @override
  int get maxRetries => 5;
}

πŸ§ͺ Testing Strategies

  • β€’ Unit Tests: Test handle() in isolation
  • β€’ Integration Tests: Test with real queue drivers
  • β€’ Mock Dependencies: Mock external APIs/services
  • β€’ Test Failures: Verify error handling paths
  • β€’ Test Retries: Ensure retry logic works
  • β€’ Test Serialization: For persistent drivers

βœ… What to Test

  • β€’ Job executes successfully with valid input
  • β€’ Job handles invalid/missing data gracefully
  • β€’ External service failures trigger retries
  • β€’ Job reaches max retries and fails permanently
  • β€’ Job serialization roundtrip works correctly
  • β€’ Delayed jobs execute at the right time
  • β€’ Batch jobs handle partial failures

Testing with Sync Driver: For tests, use the sync driver which executes jobs immediately in the current thread. This makes tests deterministic and fast. Set up a test configuration with 'driver': 'sync', then dispatch jobs and assert on the results immediately. No need to start workers or wait for async execution.

πŸ’‘ Testing Best Practices

  • Use Sync Driver: Configure queue with sync driver for immediate execution
  • Mock External Services: Don't call real APIs, email services, or payment gateways in tests
  • Test Idempotency: Run jobs multiple times to ensure they're retry-safe
  • Verify Side Effects: Check database changes, file creation, etc.
  • Test Edge Cases: Null values, empty lists, network timeouts, etc.
  • Measure Coverage: Aim for high coverage of job logic paths

Troubleshooting

Queue systems can experience various issues from configuration problems to resource constraints. This section covers common problems you might encounter and how to diagnose and fix them. Most issues fall into a few categories: worker configuration, driver setup, job design, or resource limits.

🚨 Critical Issues

Jobs not processing at all:
  • β€’ Check if workers are running: No worker = no job execution
  • β€’ Verify driver configuration is correct
  • β€’ Ensure await queueManager.startWorker() is called
  • β€’ Check logs for worker startup errors
Jobs lost after application restart:
  • β€’ Memory driver loses all jobs on restart
  • β€’ Solution: Switch to file or Redis driver for persistence
  • β€’ Update config: 'driver': 'file'
Worker crashes repeatedly:
  • β€’ Add try-catch in job handle() methods
  • β€’ Implement onError callback to catch worker-level errors
  • β€’ Check for unhandled exceptions in job code
  • β€’ Review system resources (memory, disk space)

⚠️ Common Issues

High memory usage:
  • β€’ Monitor job payload sizes - keep them small
  • β€’ Set maxJobs limit to prevent unbounded growth
  • β€’ Avoid storing large objects in job properties
  • β€’ Use IDs and load data in handle() instead
Jobs timing out:
  • β€’ Increase worker timeout parameter
  • β€’ Optimize job handle() method performance
  • β€’ Break large jobs into smaller chunks
  • β€’ Check for slow external API calls
Delayed jobs not executing on time:
  • β€’ Verify system clock is accurate
  • β€’ Check worker delay isn't too long
  • β€’ Ensure workers are running continuously
  • β€’ Look for timezone issues in delay calculations
Jobs failing with "not registered" error:
  • β€’ For persistent drivers: Register jobs with QueueJobRegistry
  • β€’ Call registerQueueJobs() before starting queue
  • β€’ Verify job type name matches registration
  • β€’ See: Job Serialization Guide

πŸ” Performance Issues

Slow job processing:
  • β€’ Reduce worker delay for faster processing
  • β€’ Run multiple workers in parallel
  • β€’ Profile job handle() methods for bottlenecks
  • β€’ Optimize database queries and API calls
Queue growing faster than processing:
  • β€’ Scale up: Run more workers
  • β€’ Optimize job execution time
  • β€’ Consider horizontal scaling (multiple servers)
  • β€’ Use Redis driver for distributed processing

πŸ› οΈ Debugging Tools

dart
// Debug queue status
Future<void> debugQueueStatus(QueueManager queueManager) async {
  final stats = await queueManager.driver.getStats();

  print('πŸ” Queue Debug Information:');
  print('Driver: ${queueManager.defaultDriverName}');
  print('Queue Depth: ${stats['queue_depth']}');
  print('Processing: ${stats['processing_count']}');
  print('Ready Jobs: ${stats['ready_jobs']}');
  print('Delayed Jobs: ${stats['delayed_jobs']}');

  // Metrics (if enabled)
  if (stats.containsKey('metrics')) {
    final metrics = stats['metrics'] as Map<String, dynamic>;
    print('\nMetrics:');
    print('  Total Queued: ${metrics['jobs_queued']}');
    print('  Completed: ${metrics['jobs_completed']}');
    print('  Failed: ${metrics['jobs_failed']}');
    print('  Success Rate: ${(metrics['success_rate'] * 100).toStringAsFixed(1)}%');

    // Jobs by type
    final byType = metrics['jobs_by_type'] as Map<String, dynamic>;
    if (byType.isNotEmpty) {
      print('\nJobs by Type:');
      byType.forEach((type, count) {
        print('  $type: $count');
      });
    }
  }
}

// Inspect in-memory queue
Future<void> inspectMemoryQueue(QueueManager queueManager) async {
  final driver = queueManager.driver;
  
  if (driver is! InMemoryDriver) {
    print('❌ Driver is not InMemoryDriver');
    return;
  }

  print('πŸ“Š Memory Queue Contents:');
  print('Pending Jobs: ${driver.pendingJobsCount}');
  print('Is Empty: ${driver.isEmpty}');

  final stats = await driver.getStats();
  print('Ready: ${stats['ready_jobs']}');
  print('Delayed: ${stats['delayed_jobs']}');
}

// Inspect file queue contents
Future<void> inspectFileQueue(QueueManager queueManager) async {
  final driver = queueManager.driver;
  
  if (driver is! FileStorageDriver) {
    print('❌ Driver is not FileStorageDriver');
    return;
  }

  final stats = await driver.getStats();
  final files = stats['storage_files'] as List<String>?;
  
  print('πŸ“ File Queue Contents:');
  print('Queue Depth: ${stats['queue_depth']}');
  print('Storage Files:');
  files?.forEach((file) => print('  - $file'));
}

// Clear queue (for testing/debugging)
Future<void> clearQueue(QueueManager queueManager) async {
  await queueManager.driver.clear();
  print('🧹 Queue cleared');
  
  final stats = await queueManager.driver.getStats();
  print('Queue Depth: ${stats['queue_depth']}');
}

βœ… Prevention Tips

  • β€’ Set up comprehensive logging and monitoring from day one
  • β€’ Implement health checks that verify workers are running
  • β€’ Add alerts for high failure rates or queue growth
  • β€’ Test job error handling and retry logic thoroughly
  • β€’ Use persistent drivers (file/Redis) in production
  • β€’ Document job requirements and dependencies
  • β€’ Keep job payloads small and focused

On this page