Task Scheduler
A lightweight and efficient task scheduling system for running background jobs and automated tasks in your Khadem application.
Background JobsInterval SchedulingTask MonitoringRetry Logic
System Overview
Scheduler Architecture
The Khadem Task Scheduler provides a simple yet powerful system for scheduling and executing background tasks with built-in monitoring, retry logic, and error handling.
Core Features
- Duration-based task scheduling
- Built-in retry mechanism
- Task state management
- Real-time task monitoring
- Graceful error handling
Key Components
- SchedulerEngine - Main scheduler
- ScheduledTask - Task representation
- ScheduledJob - Job interface
- Job Registry - Job management
- Task Statistics - Monitoring
Important Note
The Khadem scheduler uses Duration-based intervals, not cron expressions. Tasks should be designed to be idempotent and handle failures gracefully.
Getting Started
dart
// In your main.dart or bootstrap file
import 'package:khadem/khadem.dart';
import 'package:khadem/src/core/scheduler/scheduler_bootstrap.dart';
void main() async {
// Initialize Khadem
await Khadem.initialize();
// Start the scheduler system
startSchedulers();
print('π Application started with scheduler');
// Graceful shutdown
ProcessSignal.sigint.watch().listen((_) {
print('π Shutting down...');
stopSchedulers();
exit(0);
});
}
Quick Start
1. Create a Job
dart
import 'package:khadem/khadem.dart';
import 'package:khadem/src/contracts/scheduler/scheduled_job.dart';
class EmailNotificationJob extends ScheduledJob {
@override
String get name => 'email_notifications';
@override
Future<void> execute() async {
try {
// Your email sending logic here
final pendingEmails = await Database.table('pending_emails')
.where('sent', false)
.limit(10) // Process in batches
.get();
for (final email in pendingEmails) {
await sendEmail(
email['recipient'],
email['subject'],
email['body']
);
// Mark as sent
await Database.table('pending_emails')
.where('id', email['id'])
.update({'sent': true, 'sent_at': DateTime.now()});
}
Khadem.logger.info('β
Processed ${pendingEmails.length} emails');
} catch (e) {
Khadem.logger.error('β Email job failed: $e');
rethrow; // Let scheduler handle retry
}
}
}
2. Create and Schedule a Task
dart
import 'package:khadem/src/core/scheduler/core/scheduled_task.dart';
import 'package:khadem/src/core/scheduler/scheduler.dart';
void setupScheduledTasks() {
final scheduler = SchedulerEngine();
// Create a task that runs every 5 minutes
final emailTask = ScheduledTask(
name: 'send_pending_emails',
interval: Duration(minutes: 5),
job: EmailNotificationJob(),
retryOnFail: true,
maxRetries: 3,
);
// Create a cleanup task that runs every hour
final cleanupTask = ScheduledTask(
name: 'cleanup_temp_files',
interval: Duration(hours: 1),
job: TempFileCleanupJob(),
retryOnFail: false,
);
// Add tasks to scheduler
scheduler.add(emailTask);
scheduler.add(cleanupTask);
Khadem.logger.info('β
Scheduled tasks initialized');
}
Task Intervals
dart
// Duration-based scheduling examples
class DurationExamples {
static void examples() {
// Run every 30 seconds
final healthCheck = ScheduledTask(
name: 'health_check',
interval: Duration(seconds: 30),
job: PingJob(),
);
// Run every 5 minutes
final cacheCleanup = ScheduledTask(
name: 'cache_cleanup',
interval: Duration(minutes: 5),
job: CacheCleanupJob(),
);
// Run every hour
final backupTask = ScheduledTask(
name: 'database_backup',
interval: Duration(hours: 1),
job: DatabaseBackupJob(),
);
// Run daily (24 hours)
final dailyReport = ScheduledTask(
name: 'daily_report',
interval: Duration(hours: 24),
job: DailyReportJob(),
);
// Run weekly (7 days)
final weeklyCleanup = ScheduledTask(
name: 'weekly_cleanup',
interval: Duration(days: 7),
job: WeeklyCleanupJob(),
);
// Custom duration
final customTask = ScheduledTask(
name: 'custom_task',
interval: Duration(hours: 2, minutes: 30), // 2.5 hours
job: CustomJob(),
);
}
}
Common Duration Patterns
Use Case | Duration | Example |
---|---|---|
Health Check | 30 seconds | Duration(seconds: 30) |
Cache Cleanup | 5 minutes | Duration(minutes: 5) |
Database Backup | 1 hour | Duration(hours: 1) |
Weekly Report | 7 days | Duration(days: 7) |
Monthly Cleanup | 30 days | Duration(days: 30) |
Task Types
Recurring Tasks
dart
// Recurring task example
class DatabaseMaintenanceJob extends ScheduledJob {
@override
String get name => 'database_maintenance';
@override
Future<void> execute() async {
// Perform database maintenance tasks
await Database.raw('OPTIMIZE TABLE users, posts, comments');
// Clean up old logs
await Database.table('audit_logs')
.where('created_at', '<', DateTime.now().subtract(Duration(days: 30)))
.delete();
// Update statistics
await Database.raw('ANALYZE TABLE users, posts');
Khadem.logger.info('β
Database maintenance completed');
}
}
// Create recurring task
final maintenanceTask = ScheduledTask(
name: 'db_maintenance',
interval: Duration(hours: 6), // Every 6 hours
job: DatabaseMaintenanceJob(),
retryOnFail: true,
maxRetries: 2,
);
scheduler.add(maintenanceTask);
Tasks that run repeatedly at specified intervals until stopped.
One-time Tasks
dart
// One-time task example
class DataMigrationJob extends ScheduledJob {
@override
String get name => 'data_migration';
@override
Future<void> execute() async {
// Perform one-time data migration
final users = await Database.table('old_users').get();
for (final user in users) {
await Database.table('users').insert({
'name': user['full_name'],
'email': user['email_address'],
'created_at': user['registration_date'],
'migrated_at': DateTime.now(),
});
}
// Mark migration as complete
await Database.table('migrations').insert({
'name': 'migrate_old_users',
'completed_at': DateTime.now(),
});
Khadem.logger.info('β
Data migration completed');
}
}
// Create one-time task
final migrationTask = ScheduledTask(
name: 'user_migration',
interval: Duration(seconds: 1), // Run immediately
job: DataMigrationJob(),
runOnce: true, // Stop after first execution
);
scheduler.add(migrationTask);
Tasks that execute only once and then automatically stop.
Built-in Jobs
Ping Job
dart
import 'package:khadem/khadem.dart';
import 'package:khadem/src/contracts/scheduler/scheduled_job.dart';
class PingJob extends ScheduledJob {
@override
String get name => 'ping';
@override
Future<void> execute() async {
Khadem.logger.debug('π‘ Ping at ${DateTime.now()}');
// Add your custom ping logic here
}
}
A simple job that logs a ping message. Useful for testing the scheduler or keeping services alive.
Usage:
job: 'ping'
TTL Cache Cleaner
dart
import 'dart:io';
import 'package:khadem/khadem.dart';
import 'package:khadem/src/contracts/scheduler/scheduled_job.dart';
class TTLFileCleanerJob extends ScheduledJob {
@override
String get name => 'ttl_cache_cleaner';
final String cachePath;
TTLFileCleanerJob({this.cachePath = 'storage/cache'});
@override
Future<void> execute() async {
final dir = Directory(cachePath);
if (!dir.existsSync()) return;
final files = dir.listSync();
int cleanedCount = 0;
for (final file in files) {
if (file is File) {
try {
final content = await file.readAsString();
final data = jsonDecode(content);
final ttl = data['ttl'];
final createdAt = DateTime.parse(data['created_at'] ?? DateTime.now().toIso8601String());
if (ttl != null && DateTime.now().isAfter(createdAt.add(Duration(seconds: ttl)))) {
await file.delete();
cleanedCount++;
}
} catch (e) {
// Skip invalid files
Khadem.logger.warning('β οΈ Could not process cache file ${file.path}: $e');
}
}
}
if (cleanedCount > 0) {
Khadem.logger.info('π§Ή Cleaned $cleanedCount expired cache files');
}
}
}
Automatically removes expired cache files based on TTL values.
Usage:
job: 'ttl_cleaner'
Config:
cachePath: 'storage/cache'
Configuration
dart
// config/app.json
{
"scheduler": {
"tasks": [
{
"name": "health_check",
"interval": 30,
"job": "ping",
"retryOnFail": false
},
{
"name": "cache_cleanup",
"interval": 300,
"job": "ttl_cleaner",
"cachePath": "storage/cache",
"retryOnFail": true,
"maxRetries": 3
},
{
"name": "email_notifications",
"interval": 60,
"job": "email_notifications",
"retryOnFail": true,
"maxRetries": 5
},
{
"name": "database_backup",
"interval": 3600,
"job": "database_backup",
"retryOnFail": true,
"maxRetries": 2
}
]
}
}
// Tasks will be automatically loaded when calling startSchedulers()
Task Configuration Options
Required
name
- Unique task identifierinterval
- Execution interval in secondsjob
- Job name to execute
Optional
timezone
- Timezone (default: UTC)retryOnFail
- Retry on failurerunOnce
- Run only oncemaxRetries
- Max retry attempts
Task Monitoring
dart
// Task monitoring and control
class TaskMonitor {
final SchedulerEngine _scheduler;
TaskMonitor(this._scheduler);
void printTaskStatus() {
final stats = _scheduler.getStats();
print('\nπ Task Status Report:');
print('=' * 50);
for (final entry in stats.entries) {
final taskStats = entry.value;
print('\nπ Task: ${taskStats.name}');
print(' Status: ${taskStats.status}');
print(' Last Run: ${taskStats.lastRun ?? 'Never'}');
print(' Next Run: ${taskStats.nextRun ?? 'Not scheduled'}');
print(' Success Count: ${taskStats.successCount}');
print(' Failure Count: ${taskStats.failureCount}');
print(' Avg Execution Time: ${taskStats.averageExecutionTime.toStringAsFixed(2)}ms');
}
}
void pauseFailingTasks() {
final stats = _scheduler.getStats();
for (final entry in stats.entries) {
final taskStats = entry.value;
if (taskStats.failureCount > 3) {
_scheduler.pause(taskStats.name);
Khadem.logger.warning('βΈοΈ Paused failing task: ${taskStats.name}');
}
}
}
List<String> getActiveTasks() {
return _scheduler.activeTasks();
}
bool isTaskRunning(String taskName) {
return _scheduler.isRunning(taskName);
}
}
// Usage
final monitor = TaskMonitor(scheduler);
// Check status every 5 minutes
final statusTask = ScheduledTask(
name: 'status_monitor',
interval: Duration(minutes: 5),
job: StatusMonitorJob(monitor),
);
scheduler.add(statusTask);
Task Statistics
- Execution count (success/failure)
- Last execution time
- Next scheduled run
- Average execution time
- Current task status
- Retry count and history
Error Handling & Retry
dart
// Error handling with retry logic
class RetryableJob extends ScheduledJob {
@override
String get name => 'retryable_job';
@override
Future<void> execute() async {
try {
// Your business logic here
await performUnreliableOperation();
Khadem.logger.info('β
Job completed successfully');
} catch (e) {
Khadem.logger.error('β Job failed: $e');
// The scheduler will automatically handle retries
// based on the task's retryOnFail and maxRetries settings
rethrow;
}
}
}
// Create task with retry configuration
final retryableTask = ScheduledTask(
name: 'unreliable_task',
interval: Duration(minutes: 10),
job: RetryableJob(),
retryOnFail: true, // Enable retries
maxRetries: 3, // Maximum 3 retry attempts
);
// Exponential backoff is automatic:
// Attempt 1: immediate retry
// Attempt 2: 5 seconds delay
// Attempt 3: 10 seconds delay
// Attempt 4: 15 seconds delay
scheduler.add(retryableTask);
// Custom error handling job
class ErrorHandlingJob extends ScheduledJob {
@override
String get name => 'error_handler';
@override
Future<void> execute() async {
try {
await riskyOperation();
} on NetworkException catch (e) {
// Handle network errors specifically
Khadem.logger.warning('Network error, will retry: $e');
rethrow;
} on DatabaseException catch (e) {
// Handle database errors
Khadem.logger.error('Database error: $e');
// Don't retry for database errors
throw PermanentFailureException('Database unavailable');
} catch (e) {
// Handle other errors
Khadem.logger.error('Unexpected error: $e');
rethrow;
}
}
}
Retry Behavior
- Exponential backoff: 5s, 10s, 15s (configurable)
- Maximum retry attempts (default: 3)
- Automatic failure logging
- Task status updates on failure
- Graceful handling of permanent failures
Best Practices
β Do's
- Design jobs to be idempotent (safe to run multiple times)
- Use descriptive task names for easy identification
- Implement proper error handling in job execute() methods
- Monitor task execution and handle failures appropriately
- Use configuration files for task definitions in production
- Test jobs thoroughly before deploying to production
- Log important events and errors within jobs
- Use appropriate intervals based on task requirements
β Don'ts
- Don't perform long-running operations without proper timeouts
- Don't rely on task execution order or timing precision
- Don't store large amounts of data in task configurations
- Don't ignore errors or use empty catch blocks
- Don't create tasks with very short intervals unnecessarily
- Don't use blocking operations in job implementations
- Don't forget to handle job cleanup and resource disposal
- Don't use the scheduler for real-time or time-critical operations
API Reference
SchedulerEngine
dart
class SchedulerEngine implements SchedulerEngineContract {
// Core methods
void add(ScheduledTask task) // Add a new task
void stop(String name) // Stop a specific task
void stopAll() // Stop all tasks
bool isRunning(String name) // Check if task is running
List<String> activeTasks() // Get active task names
void pause(String name) // Pause a task
void resume(String name) // Resume a paused task
Map<String, TaskStats> getStats() // Get all task statistics
// Utility methods
ScheduledTask? getTask(String name) // Get task by name
List<String> getTaskNames() // Get all task names
bool hasTask(String name) // Check if task exists
int get taskCount // Get total task count
void remove(String name) // Remove a task
void clear() // Remove all tasks
}
ScheduledTask
dart
class ScheduledTask {
// Constructor
ScheduledTask({
required String name, // Unique task name
required Duration interval, // Execution interval
required ScheduledJob job, // Job to execute
String timeZone = 'UTC', // Timezone (currently not used)
bool retryOnFail = false, // Retry on failure
bool runOnce = false, // Run only once
int maxRetries = 3, // Maximum retry attempts
})
// Properties
final String name
final Duration interval
final ScheduledJob job
final String timeZone
final bool retryOnFail
final bool runOnce
final int maxRetries
// Methods
void start(Function(Duration) scheduleNext) // Start the task
void pause() // Pause execution
void resume(Function(Duration) scheduleNext) // Resume execution
Future<void> run(Function(Duration) scheduleNext) // Execute job
void stop() // Stop permanently
TaskStats get stats // Get current statistics
// Factory constructor
factory ScheduledTask.fromConfig(Map<String, dynamic> config)
}
ScheduledJob Interface
dart
abstract class ScheduledJob {
// Required property
String get name; // Unique job identifier
// Required method
Future<void> execute(); // Business logic implementation
}
// Example implementation
class MyCustomJob extends ScheduledJob {
@override
String get name => 'my_custom_job';
@override
Future<void> execute() async {
// Your custom logic here
print('Custom job executed at ${DateTime.now()}');
}
}