Background Job Workers
Complete guide to implementing background job processing with Pythia's job system.
Overview
Background job workers handle asynchronous task processing, scheduled jobs, and long-running operations outside of the main request/response cycle. Pythia's job system provides priority queues, retry logic, and comprehensive job management.
Basic Job Worker
Simple Email Job Processor
import asyncio
import smtplib
from email.mime.text import MIMEText
from typing import Dict, Any
from pythia.jobs import BackgroundJobWorker, JobProcessor, Job, JobResult
from pythia.jobs.queue import MemoryJobQueue
from pythia.config import WorkerConfig
class EmailJobProcessor(JobProcessor):
"""Process email sending jobs"""
def __init__(self, smtp_config: Dict[str, str]):
self.smtp_config = smtp_config
async def process(self, job: Job) -> JobResult:
"""Process email job"""
try:
# Extract email parameters from job
to_email = job.kwargs.get('to')
subject = job.kwargs.get('subject')
body = job.kwargs.get('body')
if not all([to_email, subject, body]):
return JobResult(
success=False,
error="Missing required email parameters"
)
# Send email
await self._send_email(to_email, subject, body)
return JobResult(
success=True,
result={
"email_sent": True,
"recipient": to_email,
"subject": subject
}
)
except Exception as e:
return JobResult(
success=False,
error=str(e),
error_type=type(e).__name__
)
async def _send_email(self, to_email: str, subject: str, body: str):
"""Send email via SMTP"""
# Create message
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = self.smtp_config['from_email']
msg['To'] = to_email
# Send via SMTP (in production, use aiosmtplib for async)
# For demo purposes, we'll simulate the email send
await asyncio.sleep(0.5) # Simulate network delay
print(f"📧 Email sent to {to_email}")
print(f" Subject: {subject}")
print(f" Body: {body[:100]}...")
# Usage example
async def run_email_jobs():
# Create SMTP configuration
smtp_config = {
'smtp_server': 'smtp.gmail.com',
'smtp_port': 587,
'from_email': 'noreply@example.com',
'username': 'your_email@gmail.com',
'password': 'your_password'
}
# Create job processor
processor = EmailJobProcessor(smtp_config)
# Create job worker
worker = BackgroundJobWorker(
queue=MemoryJobQueue(),
processor=processor,
max_concurrent_jobs=5,
polling_interval=1.0
)
# Submit email jobs
jobs = [
{
'name': 'welcome_email',
'func': 'send_email',
'kwargs': {
'to': 'new_user@example.com',
'subject': 'Welcome to Our Platform!',
'body': 'Thank you for joining us. Get started with your first project.'
}
},
{
'name': 'password_reset',
'func': 'send_email',
'kwargs': {
'to': 'user@example.com',
'subject': 'Password Reset Request',
'body': 'Click this link to reset your password: https://example.com/reset'
}
}
]
# Submit jobs
for job_data in jobs:
await worker.submit_job(**job_data)
# Start processing jobs
print("🚀 Starting email job worker...")
# Run for a limited time for demo
job_task = asyncio.create_task(worker.run())
await asyncio.sleep(10) # Process for 10 seconds
await worker.stop()
if __name__ == "__main__":
asyncio.run(run_email_jobs())
Advanced Job Processing
Multi-Type Job Processor
import asyncio
import json
import httpx
from typing import Dict, Any, Optional
from pythia.jobs import BackgroundJobWorker, JobProcessor, Job, JobResult, JobPriority
from pythia.jobs.queue import RedisJobQueue
from datetime import datetime, timedelta
class MultiTypeJobProcessor(JobProcessor):
"""Handle multiple types of jobs"""
def __init__(self):
self.http_client = httpx.AsyncClient(timeout=30.0)
async def process(self, job: Job) -> JobResult:
"""Route job to appropriate handler"""
job_type = job.name
try:
if job_type == 'send_email':
return await self._handle_email_job(job)
elif job_type == 'process_image':
return await self._handle_image_job(job)
elif job_type == 'sync_data':
return await self._handle_sync_job(job)
elif job_type == 'generate_report':
return await self._handle_report_job(job)
else:
return JobResult(
success=False,
error=f"Unknown job type: {job_type}"
)
except Exception as e:
return JobResult(
success=False,
error=str(e),
error_type=type(e).__name__
)
async def _handle_email_job(self, job: Job) -> JobResult:
"""Handle email sending job"""
# Email logic here
await asyncio.sleep(0.5) # Simulate email send
return JobResult(
success=True,
result={"email_sent": True, "recipient": job.kwargs.get('to')}
)
async def _handle_image_job(self, job: Job) -> JobResult:
"""Handle image processing job"""
image_url = job.kwargs.get('image_url')
operations = job.kwargs.get('operations', ['resize'])
# Simulate image processing
await asyncio.sleep(2.0) # Image processing takes longer
return JobResult(
success=True,
result={
"image_processed": True,
"image_url": image_url,
"operations": operations,
"output_url": f"https://cdn.example.com/processed/{job.id}.jpg"
}
)
async def _handle_sync_job(self, job: Job) -> JobResult:
"""Handle data synchronization job"""
source = job.kwargs.get('source')
destination = job.kwargs.get('destination')
# Simulate data sync via HTTP API
async with self.http_client.get(f"https://api.{source}.com/data") as response:
if response.status_code == 200:
data = response.json()
# Sync to destination
sync_response = await self.http_client.post(
f"https://api.{destination}.com/sync",
json=data
)
return JobResult(
success=True,
result={
"synced": True,
"records_count": len(data.get('records', [])),
"source": source,
"destination": destination
}
)
return JobResult(
success=False,
error="Failed to sync data"
)
async def _handle_report_job(self, job: Job) -> JobResult:
"""Handle report generation job"""
report_type = job.kwargs.get('type')
date_range = job.kwargs.get('date_range')
# Simulate report generation
await asyncio.sleep(5.0) # Reports take time to generate
return JobResult(
success=True,
result={
"report_generated": True,
"report_type": report_type,
"date_range": date_range,
"download_url": f"https://reports.example.com/{job.id}.pdf"
}
)
# Advanced usage example
async def run_advanced_jobs():
# Use Redis for job persistence
redis_queue = RedisJobQueue(
host="localhost",
port=6379,
db=0,
queue_name="pythia_jobs"
)
processor = MultiTypeJobProcessor()
worker = BackgroundJobWorker(
queue=redis_queue,
processor=processor,
max_concurrent_jobs=10,
polling_interval=0.5
)
# Submit various job types with different priorities
jobs = [
# High priority email
{
'name': 'send_email',
'func': 'send_email',
'priority': JobPriority.HIGH,
'kwargs': {
'to': 'urgent@example.com',
'subject': 'Urgent: System Alert',
'body': 'Critical system alert requiring immediate attention.'
}
},
# Normal priority image processing
{
'name': 'process_image',
'func': 'process_image',
'priority': JobPriority.NORMAL,
'kwargs': {
'image_url': 'https://example.com/upload/image1.jpg',
'operations': ['resize', 'compress', 'watermark']
}
},
# Low priority data sync
{
'name': 'sync_data',
'func': 'sync_data',
'priority': JobPriority.LOW,
'kwargs': {
'source': 'shopify',
'destination': 'warehouse'
}
},
# Scheduled report generation
{
'name': 'generate_report',
'func': 'generate_report',
'scheduled_at': datetime.now() + timedelta(minutes=5), # Run in 5 minutes
'kwargs': {
'type': 'monthly_sales',
'date_range': '2024-01-01 to 2024-01-31'
}
}
]
# Submit jobs
submitted_jobs = []
for job_data in jobs:
job = await worker.submit_job(**job_data)
submitted_jobs.append(job)
print(f"📝 Submitted job: {job.name} (ID: {job.id})")
# Start worker
print("🚀 Starting multi-type job worker...")
# Monitor jobs
async def monitor_jobs():
while True:
stats = await worker.get_queue_stats()
print(f"📊 Queue: {stats['queue_size']} | Active: {stats['active_jobs']} | Processed: {stats['worker_stats']['jobs_processed']}")
# Check individual job status
for job in submitted_jobs:
status = await worker.get_job_status(job.id)
if status:
print(f" Job {job.name}: {status.value}")
await asyncio.sleep(5)
# Run worker and monitor concurrently
monitor_task = asyncio.create_task(monitor_jobs())
worker_task = asyncio.create_task(worker.run())
# Run for demo period
await asyncio.sleep(30)
# Cleanup
monitor_task.cancel()
await worker.stop()
if __name__ == "__main__":
asyncio.run(run_advanced_jobs())
Scheduled Jobs
Cron-Like Job Scheduling
import asyncio
from datetime import datetime, timedelta
from pythia.jobs import BackgroundJobWorker, JobProcessor, Job, JobResult, JobPriority
from pythia.jobs.scheduler import JobScheduler
class ScheduledJobProcessor(JobProcessor):
"""Handle scheduled maintenance jobs"""
async def process(self, job: Job) -> JobResult:
"""Process scheduled jobs"""
job_type = job.name
if job_type == 'cleanup_logs':
return await self._cleanup_old_logs()
elif job_type == 'backup_database':
return await self._backup_database()
elif job_type == 'send_daily_report':
return await self._send_daily_report()
elif job_type == 'health_check':
return await self._perform_health_check()
return JobResult(success=False, error=f"Unknown scheduled job: {job_type}")
async def _cleanup_old_logs(self) -> JobResult:
"""Clean up log files older than 30 days"""
# Simulate log cleanup
await asyncio.sleep(2.0)
deleted_files = 15
freed_space = "2.5GB"
return JobResult(
success=True,
result={
"task": "log_cleanup",
"deleted_files": deleted_files,
"freed_space": freed_space
}
)
async def _backup_database(self) -> JobResult:
"""Perform database backup"""
# Simulate database backup
await asyncio.sleep(10.0)
backup_file = f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.sql"
return JobResult(
success=True,
result={
"task": "database_backup",
"backup_file": backup_file,
"size": "1.2GB"
}
)
async def _send_daily_report(self) -> JobResult:
"""Send daily report to stakeholders"""
# Simulate report generation and sending
await asyncio.sleep(3.0)
return JobResult(
success=True,
result={
"task": "daily_report",
"recipients": ["manager@example.com", "team@example.com"],
"metrics_included": ["users", "revenue", "errors"]
}
)
async def _perform_health_check(self) -> JobResult:
"""Perform system health check"""
# Simulate health check
await asyncio.sleep(1.0)
return JobResult(
success=True,
result={
"task": "health_check",
"status": "healthy",
"services_checked": ["database", "cache", "api", "queue"]
}
)
# Scheduled jobs example
async def run_scheduled_jobs():
processor = ScheduledJobProcessor()
worker = BackgroundJobWorker(
processor=processor,
max_concurrent_jobs=3,
polling_interval=1.0
)
# Schedule recurring jobs
now = datetime.now()
scheduled_jobs = [
# Health check every 5 minutes
{
'name': 'health_check',
'func': 'health_check',
'scheduled_at': now + timedelta(minutes=1),
'recurring': True,
'interval': timedelta(minutes=5)
},
# Log cleanup daily at 2 AM
{
'name': 'cleanup_logs',
'func': 'cleanup_logs',
'scheduled_at': now.replace(hour=2, minute=0, second=0) + timedelta(days=1),
'recurring': True,
'interval': timedelta(days=1)
},
# Database backup daily at 3 AM
{
'name': 'backup_database',
'func': 'backup_database',
'priority': JobPriority.HIGH,
'scheduled_at': now.replace(hour=3, minute=0, second=0) + timedelta(days=1),
'recurring': True,
'interval': timedelta(days=1)
},
# Daily report at 9 AM
{
'name': 'send_daily_report',
'func': 'send_daily_report',
'scheduled_at': now.replace(hour=9, minute=0, second=0) + timedelta(days=1),
'recurring': True,
'interval': timedelta(days=1)
}
]
# Submit scheduled jobs
for job_data in scheduled_jobs:
job = await worker.submit_job(**job_data)
print(f"📅 Scheduled job: {job.name} at {job.scheduled_at}")
print("⏰ Starting scheduled job worker...")
await worker.run()
if __name__ == "__main__":
asyncio.run(run_scheduled_jobs())
Job Monitoring & Management
Job Status Dashboard
import asyncio
import json
from datetime import datetime
from pythia.jobs import BackgroundJobWorker, JobStatus
class JobManager:
"""Manage and monitor background jobs"""
def __init__(self, worker: BackgroundJobWorker):
self.worker = worker
async def get_job_summary(self) -> dict:
"""Get summary of all jobs"""
stats = await self.worker.get_queue_stats()
return {
"timestamp": datetime.now().isoformat(),
"queue_stats": stats,
"worker_health": await self.worker.health_check()
}
async def list_jobs_by_status(self, status: JobStatus = None) -> list:
"""List jobs filtered by status"""
# This would require extending the job queue interface
# For demo purposes, we'll simulate
jobs = [
{
"id": "job_001",
"name": "send_email",
"status": JobStatus.COMPLETED.value,
"created_at": "2024-01-15T10:30:00Z",
"completed_at": "2024-01-15T10:30:02Z"
},
{
"id": "job_002",
"name": "process_image",
"status": JobStatus.RUNNING.value,
"created_at": "2024-01-15T10:31:00Z",
"started_at": "2024-01-15T10:31:01Z"
},
{
"id": "job_003",
"name": "generate_report",
"status": JobStatus.PENDING.value,
"created_at": "2024-01-15T10:32:00Z",
"scheduled_at": "2024-01-15T11:00:00Z"
}
]
if status:
jobs = [job for job in jobs if job["status"] == status.value]
return jobs
async def retry_failed_jobs(self) -> int:
"""Retry all failed jobs that can be retried"""
failed_jobs = await self.list_jobs_by_status(JobStatus.FAILED)
retried_count = 0
for job_data in failed_jobs:
success = await self.worker.retry_job(job_data["id"])
if success:
retried_count += 1
return retried_count
async def cancel_pending_jobs(self, job_names: list = None) -> int:
"""Cancel pending jobs, optionally filtered by name"""
pending_jobs = await self.list_jobs_by_status(JobStatus.PENDING)
cancelled_count = 0
for job_data in pending_jobs:
if job_names is None or job_data["name"] in job_names:
success = await self.worker.cancel_job(job_data["id"])
if success:
cancelled_count += 1
return cancelled_count
async def monitor_continuously(self, interval: int = 10):
"""Continuously monitor job worker"""
print("📊 Starting continuous job monitoring...")
while True:
try:
summary = await self.get_job_summary()
print(f"\n--- Job Summary at {summary['timestamp']} ---")
print(f"Queue Size: {summary['queue_stats']['queue_size']}")
print(f"Active Jobs: {summary['queue_stats']['active_jobs']}")
print(f"Available Slots: {summary['queue_stats']['available_slots']}")
print(f"Jobs Processed: {summary['queue_stats']['worker_stats']['jobs_processed']}")
print(f"Jobs Failed: {summary['queue_stats']['worker_stats']['jobs_failed']}")
print(f"Worker Health: {'✅ Healthy' if summary['worker_health'] else '❌ Unhealthy'}")
# Show jobs by status
for status in [JobStatus.RUNNING, JobStatus.PENDING, JobStatus.FAILED]:
jobs = await self.list_jobs_by_status(status)
if jobs:
print(f"{status.value.upper()}: {len(jobs)} jobs")
for job in jobs[:3]: # Show first 3
print(f" - {job['name']} (ID: {job['id']})")
await asyncio.sleep(interval)
except KeyboardInterrupt:
break
except Exception as e:
print(f"❌ Monitor error: {e}")
await asyncio.sleep(interval)
# Usage example
async def run_job_monitoring():
# Create worker
processor = MultiTypeJobProcessor()
worker = BackgroundJobWorker(processor=processor)
# Create job manager
manager = JobManager(worker)
# Submit some test jobs
test_jobs = [
{'name': 'send_email', 'func': 'send_email', 'kwargs': {'to': 'test@example.com'}},
{'name': 'process_image', 'func': 'process_image', 'kwargs': {'image_url': 'test.jpg'}},
{'name': 'generate_report', 'func': 'generate_report', 'kwargs': {'type': 'daily'}},
]
for job_data in test_jobs:
await worker.submit_job(**job_data)
# Start worker and monitoring
worker_task = asyncio.create_task(worker.run())
monitor_task = asyncio.create_task(manager.monitor_continuously())
# Run for demo
await asyncio.sleep(30)
# Cleanup
monitor_task.cancel()
await worker.stop()
if __name__ == "__main__":
asyncio.run(run_job_monitoring())
Best Practices
1. Job Design
# Good: Specific, focused jobs
class EmailJob(JobProcessor):
async def process(self, job: Job) -> JobResult:
# Single responsibility: send email
pass
# Good: Idempotent jobs
class DataSyncJob(JobProcessor):
async def process(self, job: Job) -> JobResult:
# Check if sync already completed
if await self._already_synced(job.kwargs['sync_id']):
return JobResult(success=True, result="Already synced")
# Proceed with sync...
2. Error Handling
class RobustJobProcessor(JobProcessor):
async def process(self, job: Job) -> JobResult:
try:
# Job logic here
result = await self._do_work(job)
return JobResult(success=True, result=result)
except TemporaryError as e:
# Retryable error
return JobResult(
success=False,
error=str(e),
retryable=True
)
except PermanentError as e:
# Non-retryable error
return JobResult(
success=False,
error=str(e),
retryable=False
)
3. Resource Management
class ResourceManagedProcessor(JobProcessor):
def __init__(self):
self.db_pool = None
self.http_client = None
async def startup(self):
"""Initialize resources"""
self.db_pool = await create_db_pool()
self.http_client = httpx.AsyncClient()
async def shutdown(self):
"""Cleanup resources"""
if self.db_pool:
await self.db_pool.close()
if self.http_client:
await self.http_client.aclose()
async def process(self, job: Job) -> JobResult:
# Use managed resources
async with self.db_pool.acquire() as conn:
# Database operations
pass
Testing Background Jobs
import pytest
from pythia.jobs import Job, JobResult, JobPriority
from pythia.jobs.queue import MemoryJobQueue
@pytest.mark.asyncio
class TestEmailJobProcessor:
async def test_successful_email_job(self):
"""Test successful email processing"""
processor = EmailJobProcessor({})
job = Job(
name="test_email",
func="send_email",
kwargs={
"to": "test@example.com",
"subject": "Test",
"body": "Test body"
}
)
result = await processor.process(job)
assert result.success is True
assert result.result["email_sent"] is True
assert result.result["recipient"] == "test@example.com"
async def test_missing_email_parameters(self):
"""Test job with missing parameters"""
processor = EmailJobProcessor({})
job = Job(
name="test_email",
func="send_email",
kwargs={"to": "test@example.com"} # Missing subject and body
)
result = await processor.process(job)
assert result.success is False
assert "Missing required email parameters" in result.error
@pytest.mark.asyncio
async def test_job_queue_operations():
"""Test job queue operations"""
queue = MemoryJobQueue()
# Create test job
job = Job(
name="test_job",
func="test_func",
priority=JobPriority.HIGH
)
# Test put/get
await queue.put(job)
assert await queue.size() == 1
retrieved_job = await queue.get()
assert retrieved_job.id == job.id
assert retrieved_job.priority == JobPriority.HIGH
Next Steps
- HTTP Workers - HTTP polling and webhook workers
- Message Workers - Message-based worker patterns
- Worker Lifecycle - Understanding worker management