Every application needs to run tasks on a schedule. Send daily reports at 9 AM. Clean up old data every Sunday. Check for updates every 5 minutes. Process uploaded files in the background. These recurring tasks keep systems running smoothly without manual intervention.
Python offers multiple approaches to task scheduling, from simple cron-like libraries to distributed task queues that handle millions of jobs. Choosing the right tool depends on your requirements: simple periodic tasks need lightweight schedulers, while complex workflows require reliable queue systems.
This guide covers task scheduling from basic to advanced. You will learn when to use each tool, how to handle failures, and how to build reliable automation systems.
Scheduling Approaches
Different scheduling needs require different tools. Understanding the options helps you choose the right approach.
When to Use Each Tool
schedule library works for simple scripts that run continuously. Good for personal projects or small applications with basic scheduling needs. Not suitable for production systems that need reliability or distributed execution.
APScheduler provides cron-like scheduling with persistence and multiple execution backends. Use when you need reliable scheduling in a single application. Supports background execution, job stores, and multiple triggers.
Celery handles distributed task queues with workers across multiple machines. Choose this for high-volume background processing, complex workflows, or when you need horizontal scaling. Requires a message broker like Redis or RabbitMQ.
Cron remains the standard for system-level scheduling on Linux/Unix. Use for server maintenance tasks, backups, or any job that should run regardless of application state.
Simple Scheduling with schedule
The schedule library provides a clean API for basic periodic tasks.
Basic Usage
import schedule
import time
from datetime import datetime
def job():
print(f"Task executed at {datetime.now()}")
# Schedule jobs
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
# Run scheduler
while True:
schedule.run_pending()
time.sleep(1)
Passing Arguments to Jobs
def send_email(recipient, subject):
print(f"Sending email to {recipient}: {subject}")
schedule.every().day.at("09:00").do(
send_email,
recipient="team@example.com",
subject="Daily Report"
)
Job Cancellation
def temporary_job():
print("This job will cancel itself")
return schedule.CancelJob
# Run once then cancel
schedule.every().day.at("12:00").do(temporary_job)
# Cancel specific job
job = schedule.every().hour.do(some_task)
schedule.cancel_job(job)
# Clear all jobs
schedule.clear()
Decorators for Cleaner Code
from schedule import repeat, every
@repeat(every(10).minutes)
def check_updates():
print("Checking for updates...")
@repeat(every().day.at("08:00"))
def morning_report():
print("Generating morning report...")
# Jobs are automatically scheduled
while True:
schedule.run_pending()
time.sleep(1)
Advanced Scheduling with APScheduler
APScheduler provides production-ready scheduling with persistence and multiple backends.
Installation and Setup
pip install apscheduler
Basic Configuration
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime
import time
scheduler = BackgroundScheduler()
def my_job():
print(f"Job executed at {datetime.now()}")
# Add jobs with different triggers
scheduler.add_job(my_job, 'interval', minutes=5)
scheduler.add_job(my_job, 'cron', hour=9, minute=0)
scheduler.add_job(my_job, CronTrigger(day_of_week='mon-fri', hour=17))
# Start scheduler
scheduler.start()
try:
# Keep main thread alive
while True:
time.sleep(2)
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
Trigger Types
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime, timedelta
# Run once at specific time
scheduler.add_job(
func=one_time_task,
trigger=DateTrigger(run_date=datetime(2024, 12, 31, 23, 59))
)
# Run every N seconds/minutes/hours
scheduler.add_job(
func=periodic_task,
trigger=IntervalTrigger(seconds=30)
)
# Cron-style scheduling
scheduler.add_job(
func=daily_task,
trigger=CronTrigger(
day_of_week='mon-fri',
hour=9,
minute=0,
timezone='America/New_York'
)
)
# Complex cron expression
scheduler.add_job(
func=complex_task,
trigger=CronTrigger.from_crontab('0 */2 * * *') # Every 2 hours
)
Job Stores for Persistence
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
# Configure job stores and executors
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False, # Run all missed executions
'max_instances': 3 # Max concurrent instances
}
scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults
)
# Jobs persist across restarts
scheduler.add_job(
func=important_task,
trigger='cron',
hour=10,
id='daily_report', # Unique ID for persistence
replace_existing=True
)
scheduler.start()
Error Handling
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def job_listener(event):
if event.exception:
logger.error(f"Job {event.job_id} failed: {event.exception}")
else:
logger.info(f"Job {event.job_id} completed successfully")
scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
def risky_job():
try:
# Task logic
result = perform_operation()
return result
except Exception as e:
logger.error(f"Job failed: {e}")
# Optionally re-raise to trigger retry
raise
scheduler.add_job(
func=risky_job,
trigger='interval',
minutes=10,
max_instances=1,
misfire_grace_time=300 # Allow 5 minutes delay
)
Distributed Task Queues with Celery
Celery handles background tasks across multiple workers with support for retries, priorities, and complex workflows.
Setup and Configuration
pip install celery redis
Create celery_app.py:
from celery import Celery
app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
task_track_started=True,
task_time_limit=30 * 60, # 30 minutes
task_soft_time_limit=25 * 60, # 25 minutes
)
Defining Tasks
from celery_app import app
import time
@app.task
def add(x, y):
return x + y
@app.task(bind=True, max_retries=3)
def process_data(self, data_id):
try:
# Simulate processing
time.sleep(5)
return f"Processed {data_id}"
except Exception as exc:
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
@app.task
def send_email(recipient, subject, body):
# Email sending logic
print(f"Sending email to {recipient}")
return True
Calling Tasks
# Synchronous (blocks until complete)
result = add.delay(4, 6)
print(result.get(timeout=10)) # 10
# Asynchronous (returns immediately)
task = process_data.delay(123)
print(f"Task ID: {task.id}")
# Check status later
if task.ready():
print(f"Result: {task.result}")
else:
print("Task still running")
# Chain tasks
from celery import chain
workflow = chain(
process_data.s(1),
process_data.s(2),
send_email.s('admin@example.com', 'Done', 'Processing complete')
)
workflow.apply_async()
Periodic Tasks with Celery Beat
from celery.schedules import crontab
from celery_app import app
app.conf.beat_schedule = {
'cleanup-every-night': {
'task': 'tasks.cleanup_old_data',
'schedule': crontab(hour=2, minute=0),
},
'send-report-weekdays': {
'task': 'tasks.send_daily_report',
'schedule': crontab(
hour=9,
minute=0,
day_of_week='mon-fri'
),
},
'check-every-5-minutes': {
'task': 'tasks.health_check',
'schedule': 300.0, # Seconds
},
}
@app.task
def cleanup_old_data():
print("Cleaning up old data...")
@app.task
def send_daily_report():
print("Sending daily report...")
@app.task
def health_check():
print("Running health check...")
Start Celery worker and beat:
# Start worker
celery -A celery_app worker --loglevel=info
# Start beat scheduler (in separate terminal)
celery -A celery_app beat --loglevel=info
Task Priorities and Routing
from kombu import Queue
app.conf.task_routes = {
'tasks.high_priority_task': {'queue': 'high'},
'tasks.low_priority_task': {'queue': 'low'},
}
app.conf.task_queues = (
Queue('high', routing_key='high'),
Queue('default', routing_key='default'),
Queue('low', routing_key='low'),
)
@app.task
def high_priority_task():
print("High priority task")
@app.task
def low_priority_task():
print("Low priority task")
# Call with specific queue
high_priority_task.apply_async(queue='high')
Start workers for specific queues:
# High priority worker
celery -A celery_app worker -Q high --loglevel=info
# Low priority worker
celery -A celery_app worker -Q low --loglevel=info
System Cron Jobs
For system-level tasks, cron remains the standard on Linux/Unix systems.
Cron Syntax
* * * * * command
│ │ │ │ │
│ │ │ │ └─── Day of week (0-7, Sunday=0 or 7)
│ │ │ └───── Month (1-12)
│ │ └─────── Day of month (1-31)
│ └───────── Hour (0-23)
└─────────── Minute (0-59)
Common Patterns
# Edit crontab
crontab -e
# Every day at 2 AM
0 2 * * * /usr/bin/python3 /path/to/script.py
# Every Monday at 9 AM
0 9 * * 1 /usr/bin/python3 /path/to/weekly_report.py
# Every 15 minutes
*/15 * * * * /usr/bin/python3 /path/to/check.py
# First day of every month
0 0 1 * * /usr/bin/python3 /path/to/monthly_task.py
# Weekdays at 6 PM
0 18 * * 1-5 /usr/bin/python3 /path/to/weekday_task.py
Logging and Error Handling
# Redirect output to log file
0 2 * * * /usr/bin/python3 /path/to/script.py >> /var/log/myscript.log 2>&1
# Email errors (if mail is configured)
MAILTO=admin@example.com
0 2 * * * /usr/bin/python3 /path/to/script.py
# Set environment variables
PATH=/usr/local/bin:/usr/bin:/bin
PYTHONPATH=/path/to/project
0 2 * * * /usr/bin/python3 /path/to/script.py
Python Script for Cron
#!/usr/bin/env python3
import sys
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(
filename='/var/log/myscript.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def main():
try:
logging.info("Task started")
# Task logic here
result = perform_task()
logging.info(f"Task completed: {result}")
return 0
except Exception as e:
logging.error(f"Task failed: {e}", exc_info=True)
return 1
if __name__ == "__main__":
sys.exit(main())
Monitoring and Alerting
Production scheduling systems need monitoring to detect failures and performance issues.
Health Checks
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta
import requests
class SchedulerMonitor:
def __init__(self):
self.last_run = {}
self.failures = {}
def record_success(self, job_id):
self.last_run[job_id] = datetime.now()
self.failures[job_id] = 0
def record_failure(self, job_id):
self.failures[job_id] = self.failures.get(job_id, 0) + 1
def check_health(self):
issues = []
now = datetime.now()
for job_id, last_run in self.last_run.items():
# Alert if job hasn't run in 2 hours
if now - last_run > timedelta(hours=2):
issues.append(f"Job {job_id} hasn't run in 2+ hours")
# Alert on repeated failures
if self.failures.get(job_id, 0) >= 3:
issues.append(f"Job {job_id} failed 3+ times")
return issues
monitor = SchedulerMonitor()
def monitored_job(job_id):
try:
# Job logic
result = perform_task()
monitor.record_success(job_id)
return result
except Exception as e:
monitor.record_failure(job_id)
raise
# Check health every 10 minutes
scheduler = BackgroundScheduler()
scheduler.add_job(
lambda: send_alerts(monitor.check_health()),
'interval',
minutes=10
)
Celery Monitoring with Flower
pip install flower
# Start Flower web UI
celery -A celery_app flower --port=5555
Access dashboard at http://localhost:5555 to view:
- Active workers and tasks
- Task success/failure rates
- Queue lengths
- Task execution times
Dead Letter Queues
from celery_app import app
@app.task(bind=True, max_retries=3)
def unreliable_task(self, data):
try:
# Task logic
process(data)
except Exception as exc:
if self.request.retries >= self.max_retries:
# Move to dead letter queue after max retries
dead_letter_task.delay(data, str(exc))
return
raise self.retry(exc=exc, countdown=60)
@app.task
def dead_letter_task(data, error):
# Log failed task for manual review
logging.error(f"Task permanently failed: {data}, Error: {error}")
# Store in database for later analysis
save_failed_task(data, error)
Best Practices
Idempotency
Make tasks safe to run multiple times:
@app.task
def process_order(order_id):
# Check if already processed
if Order.objects.filter(id=order_id, status='processed').exists():
return "Already processed"
# Process order
order = Order.objects.get(id=order_id)
order.process()
order.status = 'processed'
order.save()
return "Processed successfully"
Timeout Handling
from celery.exceptions import SoftTimeLimitExceeded
@app.task(time_limit=300, soft_time_limit=270)
def long_running_task():
try:
# Task logic
for item in large_dataset:
process(item)
except SoftTimeLimitExceeded:
# Clean up before hard timeout
cleanup()
raise
Resource Management
@app.task
def database_task():
connection = None
try:
connection = get_database_connection()
# Use connection
result = connection.execute(query)
return result
finally:
if connection:
connection.close()
Rate Limiting
@app.task(rate_limit='10/m') # 10 tasks per minute
def api_call(endpoint):
response = requests.get(endpoint)
return response.json()
# Or use time-based limiting
from celery.utils.time import rate
@app.task(rate_limit=rate(limit=100, per=3600)) # 100 per hour
def limited_task():
pass
Summary
Task scheduling automates recurring work and background processing. The right tool depends on your needs: simple scripts use schedule, single applications use APScheduler, distributed systems use Celery, and system tasks use cron.
Key principles for reliable scheduling: make tasks idempotent so they can safely retry, implement proper error handling and logging, monitor execution to detect failures early, and use appropriate timeouts to prevent hung tasks.
APScheduler provides production-ready scheduling with persistence and multiple backends. Celery handles distributed task queues with workers across multiple machines. Both support cron-like scheduling, retries, and monitoring.
For monitoring, track last execution times, failure counts, and queue lengths. Use health checks to detect stuck jobs. Implement dead letter queues for tasks that fail repeatedly.
Start with the simplest tool that meets your requirements. Add complexity only when needed. Good scheduling systems are boring: they run reliably without attention.
For more automation content, check our guides on Python automation scripts and production best practices.
Sources:
Discussion
Leave a comment
No comments yet
Be the first to start the conversation.