Task Scheduler

A lightweight and efficient task scheduling system for running background jobs and automated tasks in your Khadem application.

Background JobsInterval SchedulingTask MonitoringRetry Logic

System Overview

Scheduler Architecture

The Khadem Task Scheduler provides a simple yet powerful system for scheduling and executing background tasks with built-in monitoring, retry logic, and error handling.

Core Features

  • Duration-based task scheduling
  • Built-in retry mechanism
  • Task state management
  • Real-time task monitoring
  • Graceful error handling

Key Components

  • SchedulerEngine - Main scheduler
  • ScheduledTask - Task representation
  • ScheduledJob - Job interface
  • Job Registry - Job management
  • Task Statistics - Monitoring

Important Note

The Khadem scheduler uses Duration-based intervals, not cron expressions. Tasks should be designed to be idempotent and handle failures gracefully.

Getting Started

dart
// In your main.dart or bootstrap file
import 'package:khadem/khadem.dart';
import 'package:khadem/src/core/scheduler/scheduler_bootstrap.dart';

void main() async {
  // Initialize Khadem
  await Khadem.initialize();

  // Start the scheduler system
  startSchedulers();

  print('πŸš€ Application started with scheduler');

  // Graceful shutdown
  ProcessSignal.sigint.watch().listen((_) {
    print('πŸ›‘ Shutting down...');
    stopSchedulers();
    exit(0);
  });
}

Quick Start

1. Create a Job

dart
import 'package:khadem/khadem.dart';
import 'package:khadem/src/contracts/scheduler/scheduled_job.dart';

class EmailNotificationJob extends ScheduledJob {
  @override
  String get name => 'email_notifications';

  @override
  Future<void> execute() async {
    try {
      // Your email sending logic here
      final pendingEmails = await Database.table('pending_emails')
          .where('sent', false)
          .limit(10) // Process in batches
          .get();

      for (final email in pendingEmails) {
        await sendEmail(
          email['recipient'],
          email['subject'],
          email['body']
        );

        // Mark as sent
        await Database.table('pending_emails')
            .where('id', email['id'])
            .update({'sent': true, 'sent_at': DateTime.now()});
      }

      Khadem.logger.info('βœ… Processed ${pendingEmails.length} emails');
    } catch (e) {
      Khadem.logger.error('❌ Email job failed: $e');
      rethrow; // Let scheduler handle retry
    }
  }
}

2. Create and Schedule a Task

dart
import 'package:khadem/src/core/scheduler/core/scheduled_task.dart';
import 'package:khadem/src/core/scheduler/scheduler.dart';

void setupScheduledTasks() {
  final scheduler = SchedulerEngine();

  // Create a task that runs every 5 minutes
  final emailTask = ScheduledTask(
    name: 'send_pending_emails',
    interval: Duration(minutes: 5),
    job: EmailNotificationJob(),
    retryOnFail: true,
    maxRetries: 3,
  );

  // Create a cleanup task that runs every hour
  final cleanupTask = ScheduledTask(
    name: 'cleanup_temp_files',
    interval: Duration(hours: 1),
    job: TempFileCleanupJob(),
    retryOnFail: false,
  );

  // Add tasks to scheduler
  scheduler.add(emailTask);
  scheduler.add(cleanupTask);

  Khadem.logger.info('βœ… Scheduled tasks initialized');
}

Task Intervals

dart
// Duration-based scheduling examples
class DurationExamples {
  static void examples() {
    // Run every 30 seconds
    final healthCheck = ScheduledTask(
      name: 'health_check',
      interval: Duration(seconds: 30),
      job: PingJob(),
    );

    // Run every 5 minutes
    final cacheCleanup = ScheduledTask(
      name: 'cache_cleanup',
      interval: Duration(minutes: 5),
      job: CacheCleanupJob(),
    );

    // Run every hour
    final backupTask = ScheduledTask(
      name: 'database_backup',
      interval: Duration(hours: 1),
      job: DatabaseBackupJob(),
    );

    // Run daily (24 hours)
    final dailyReport = ScheduledTask(
      name: 'daily_report',
      interval: Duration(hours: 24),
      job: DailyReportJob(),
    );

    // Run weekly (7 days)
    final weeklyCleanup = ScheduledTask(
      name: 'weekly_cleanup',
      interval: Duration(days: 7),
      job: WeeklyCleanupJob(),
    );

    // Custom duration
    final customTask = ScheduledTask(
      name: 'custom_task',
      interval: Duration(hours: 2, minutes: 30), // 2.5 hours
      job: CustomJob(),
    );
  }
}

Common Duration Patterns

Use CaseDurationExample
Health Check30 secondsDuration(seconds: 30)
Cache Cleanup5 minutesDuration(minutes: 5)
Database Backup1 hourDuration(hours: 1)
Weekly Report7 daysDuration(days: 7)
Monthly Cleanup30 daysDuration(days: 30)

Task Types

Recurring Tasks

dart
// Recurring task example
class DatabaseMaintenanceJob extends ScheduledJob {
  @override
  String get name => 'database_maintenance';

  @override
  Future<void> execute() async {
    // Perform database maintenance tasks
    await Database.raw('OPTIMIZE TABLE users, posts, comments');

    // Clean up old logs
    await Database.table('audit_logs')
        .where('created_at', '<', DateTime.now().subtract(Duration(days: 30)))
        .delete();

    // Update statistics
    await Database.raw('ANALYZE TABLE users, posts');

    Khadem.logger.info('βœ… Database maintenance completed');
  }
}

// Create recurring task
final maintenanceTask = ScheduledTask(
  name: 'db_maintenance',
  interval: Duration(hours: 6), // Every 6 hours
  job: DatabaseMaintenanceJob(),
  retryOnFail: true,
  maxRetries: 2,
);

scheduler.add(maintenanceTask);

Tasks that run repeatedly at specified intervals until stopped.

One-time Tasks

dart
// One-time task example
class DataMigrationJob extends ScheduledJob {
  @override
  String get name => 'data_migration';

  @override
  Future<void> execute() async {
    // Perform one-time data migration
    final users = await Database.table('old_users').get();

    for (final user in users) {
      await Database.table('users').insert({
        'name': user['full_name'],
        'email': user['email_address'],
        'created_at': user['registration_date'],
        'migrated_at': DateTime.now(),
      });
    }

    // Mark migration as complete
    await Database.table('migrations').insert({
      'name': 'migrate_old_users',
      'completed_at': DateTime.now(),
    });

    Khadem.logger.info('βœ… Data migration completed');
  }
}

// Create one-time task
final migrationTask = ScheduledTask(
  name: 'user_migration',
  interval: Duration(seconds: 1), // Run immediately
  job: DataMigrationJob(),
  runOnce: true, // Stop after first execution
);

scheduler.add(migrationTask);

Tasks that execute only once and then automatically stop.

Built-in Jobs

Ping Job

dart
import 'package:khadem/khadem.dart';
import 'package:khadem/src/contracts/scheduler/scheduled_job.dart';

class PingJob extends ScheduledJob {
  @override
  String get name => 'ping';

  @override
  Future<void> execute() async {
    Khadem.logger.debug('πŸ“‘ Ping at ${DateTime.now()}');
    // Add your custom ping logic here
  }
}

A simple job that logs a ping message. Useful for testing the scheduler or keeping services alive.

Usage:job: 'ping'

TTL Cache Cleaner

dart
import 'dart:io';
import 'package:khadem/khadem.dart';
import 'package:khadem/src/contracts/scheduler/scheduled_job.dart';

class TTLFileCleanerJob extends ScheduledJob {
  @override
  String get name => 'ttl_cache_cleaner';

  final String cachePath;

  TTLFileCleanerJob({this.cachePath = 'storage/cache'});

  @override
  Future<void> execute() async {
    final dir = Directory(cachePath);
    if (!dir.existsSync()) return;

    final files = dir.listSync();
    int cleanedCount = 0;

    for (final file in files) {
      if (file is File) {
        try {
          final content = await file.readAsString();
          final data = jsonDecode(content);

          final ttl = data['ttl'];
          final createdAt = DateTime.parse(data['created_at'] ?? DateTime.now().toIso8601String());

          if (ttl != null && DateTime.now().isAfter(createdAt.add(Duration(seconds: ttl)))) {
            await file.delete();
            cleanedCount++;
          }
        } catch (e) {
          // Skip invalid files
          Khadem.logger.warning('⚠️ Could not process cache file ${file.path}: $e');
        }
      }
    }

    if (cleanedCount > 0) {
      Khadem.logger.info('🧹 Cleaned $cleanedCount expired cache files');
    }
  }
}

Automatically removes expired cache files based on TTL values.

Usage:job: 'ttl_cleaner'
Config:cachePath: 'storage/cache'

Configuration

dart
// config/app.json
{
  "scheduler": {
    "tasks": [
      {
        "name": "health_check",
        "interval": 30,
        "job": "ping",
        "retryOnFail": false
      },
      {
        "name": "cache_cleanup",
        "interval": 300,
        "job": "ttl_cleaner",
        "cachePath": "storage/cache",
        "retryOnFail": true,
        "maxRetries": 3
      },
      {
        "name": "email_notifications",
        "interval": 60,
        "job": "email_notifications",
        "retryOnFail": true,
        "maxRetries": 5
      },
      {
        "name": "database_backup",
        "interval": 3600,
        "job": "database_backup",
        "retryOnFail": true,
        "maxRetries": 2
      }
    ]
  }
}

// Tasks will be automatically loaded when calling startSchedulers()

Task Configuration Options

Required
  • name - Unique task identifier
  • interval - Execution interval in seconds
  • job - Job name to execute
Optional
  • timezone - Timezone (default: UTC)
  • retryOnFail - Retry on failure
  • runOnce - Run only once
  • maxRetries - Max retry attempts

Task Monitoring

dart
// Task monitoring and control
class TaskMonitor {
  final SchedulerEngine _scheduler;

  TaskMonitor(this._scheduler);

  void printTaskStatus() {
    final stats = _scheduler.getStats();

    print('\nπŸ“Š Task Status Report:');
    print('=' * 50);

    for (final entry in stats.entries) {
      final taskStats = entry.value;
      print('\nπŸ“‹ Task: ${taskStats.name}');
      print('   Status: ${taskStats.status}');
      print('   Last Run: ${taskStats.lastRun ?? 'Never'}');
      print('   Next Run: ${taskStats.nextRun ?? 'Not scheduled'}');
      print('   Success Count: ${taskStats.successCount}');
      print('   Failure Count: ${taskStats.failureCount}');
      print('   Avg Execution Time: ${taskStats.averageExecutionTime.toStringAsFixed(2)}ms');
    }
  }

  void pauseFailingTasks() {
    final stats = _scheduler.getStats();

    for (final entry in stats.entries) {
      final taskStats = entry.value;
      if (taskStats.failureCount > 3) {
        _scheduler.pause(taskStats.name);
        Khadem.logger.warning('⏸️ Paused failing task: ${taskStats.name}');
      }
    }
  }

  List<String> getActiveTasks() {
    return _scheduler.activeTasks();
  }

  bool isTaskRunning(String taskName) {
    return _scheduler.isRunning(taskName);
  }
}

// Usage
final monitor = TaskMonitor(scheduler);

// Check status every 5 minutes
final statusTask = ScheduledTask(
  name: 'status_monitor',
  interval: Duration(minutes: 5),
  job: StatusMonitorJob(monitor),
);

scheduler.add(statusTask);

Task Statistics

  • Execution count (success/failure)
  • Last execution time
  • Next scheduled run
  • Average execution time
  • Current task status
  • Retry count and history

Error Handling & Retry

dart
// Error handling with retry logic
class RetryableJob extends ScheduledJob {
  @override
  String get name => 'retryable_job';

  @override
  Future<void> execute() async {
    try {
      // Your business logic here
      await performUnreliableOperation();

      Khadem.logger.info('βœ… Job completed successfully');
    } catch (e) {
      Khadem.logger.error('❌ Job failed: $e');

      // The scheduler will automatically handle retries
      // based on the task's retryOnFail and maxRetries settings
      rethrow;
    }
  }
}

// Create task with retry configuration
final retryableTask = ScheduledTask(
  name: 'unreliable_task',
  interval: Duration(minutes: 10),
  job: RetryableJob(),
  retryOnFail: true,      // Enable retries
  maxRetries: 3,          // Maximum 3 retry attempts
);

// Exponential backoff is automatic:
// Attempt 1: immediate retry
// Attempt 2: 5 seconds delay
// Attempt 3: 10 seconds delay
// Attempt 4: 15 seconds delay

scheduler.add(retryableTask);

// Custom error handling job
class ErrorHandlingJob extends ScheduledJob {
  @override
  String get name => 'error_handler';

  @override
  Future<void> execute() async {
    try {
      await riskyOperation();
    } on NetworkException catch (e) {
      // Handle network errors specifically
      Khadem.logger.warning('Network error, will retry: $e');
      rethrow;
    } on DatabaseException catch (e) {
      // Handle database errors
      Khadem.logger.error('Database error: $e');
      // Don't retry for database errors
      throw PermanentFailureException('Database unavailable');
    } catch (e) {
      // Handle other errors
      Khadem.logger.error('Unexpected error: $e');
      rethrow;
    }
  }
}

Retry Behavior

  • Exponential backoff: 5s, 10s, 15s (configurable)
  • Maximum retry attempts (default: 3)
  • Automatic failure logging
  • Task status updates on failure
  • Graceful handling of permanent failures

Best Practices

βœ… Do's

  • Design jobs to be idempotent (safe to run multiple times)
  • Use descriptive task names for easy identification
  • Implement proper error handling in job execute() methods
  • Monitor task execution and handle failures appropriately
  • Use configuration files for task definitions in production
  • Test jobs thoroughly before deploying to production
  • Log important events and errors within jobs
  • Use appropriate intervals based on task requirements

❌ Don'ts

  • Don't perform long-running operations without proper timeouts
  • Don't rely on task execution order or timing precision
  • Don't store large amounts of data in task configurations
  • Don't ignore errors or use empty catch blocks
  • Don't create tasks with very short intervals unnecessarily
  • Don't use blocking operations in job implementations
  • Don't forget to handle job cleanup and resource disposal
  • Don't use the scheduler for real-time or time-critical operations

API Reference

SchedulerEngine

dart
class SchedulerEngine implements SchedulerEngineContract {
  // Core methods
  void add(ScheduledTask task)           // Add a new task
  void stop(String name)                 // Stop a specific task
  void stopAll()                         // Stop all tasks
  bool isRunning(String name)            // Check if task is running
  List<String> activeTasks()             // Get active task names
  void pause(String name)                // Pause a task
  void resume(String name)               // Resume a paused task
  Map<String, TaskStats> getStats()      // Get all task statistics

  // Utility methods
  ScheduledTask? getTask(String name)    // Get task by name
  List<String> getTaskNames()            // Get all task names
  bool hasTask(String name)              // Check if task exists
  int get taskCount                      // Get total task count
  void remove(String name)               // Remove a task
  void clear()                           // Remove all tasks
}

ScheduledTask

dart
class ScheduledTask {
  // Constructor
  ScheduledTask({
    required String name,              // Unique task name
    required Duration interval,        // Execution interval
    required ScheduledJob job,         // Job to execute
    String timeZone = 'UTC',           // Timezone (currently not used)
    bool retryOnFail = false,          // Retry on failure
    bool runOnce = false,              // Run only once
    int maxRetries = 3,                // Maximum retry attempts
  })

  // Properties
  final String name
  final Duration interval
  final ScheduledJob job
  final String timeZone
  final bool retryOnFail
  final bool runOnce
  final int maxRetries

  // Methods
  void start(Function(Duration) scheduleNext)    // Start the task
  void pause()                                   // Pause execution
  void resume(Function(Duration) scheduleNext)   // Resume execution
  Future<void> run(Function(Duration) scheduleNext) // Execute job
  void stop()                                    // Stop permanently
  TaskStats get stats                           // Get current statistics

  // Factory constructor
  factory ScheduledTask.fromConfig(Map<String, dynamic> config)
}

ScheduledJob Interface

dart
abstract class ScheduledJob {
  // Required property
  String get name;                    // Unique job identifier

  // Required method
  Future<void> execute();             // Business logic implementation
}

// Example implementation
class MyCustomJob extends ScheduledJob {
  @override
  String get name => 'my_custom_job';

  @override
  Future<void> execute() async {
    // Your custom logic here
    print('Custom job executed at ${DateTime.now()}');
  }
}

On this page