Queued Tasks Technical Details¶
Architecture, CLI commands, monitoring, and troubleshooting for the Queued Task system.
Architecture Overview¶
The Queued Task system is built around PostgreSQL for reliable task storage and coordination across multiple workers and servers.
Core Components¶
- Task Storage: PostgreSQL tables store task definitions, state, and logs
- Worker Pool: Background processes that execute tasks
- Notification System: PostgreSQL NOTIFY/LISTEN for real-time task dispatch
- Context System: Nested context tracking with database persistence
- Dependency Management: Parent-child task relationships with cascade handling
Task Lifecycle¶
CLI Commands¶
Queue Management¶
# Check queue status
fastedgy queue status
# Start workers
fastedgy queue start --workers=3
# Clear pending tasks (development only)
fastedgy queue clear
# Retry failed tasks
fastedgy queue retry
# View detailed statistics
fastedgy queue stats
# List active servers
fastedgy queue servers
Server Management¶
# Start HTTP server only
fastedgy serve
# Start server with workers
fastedgy serve --workers=3
# Start only workers (no HTTP)
fastedgy serve --workers=3 --no-http
Configuration¶
Environment Variables¶
# Worker settings
QUEUE_MAX_WORKERS=4 # Default: CPU count
QUEUE_WORKER_IDLE_TIMEOUT=60 # Seconds before idle worker shutdown
QUEUE_POLLING_INTERVAL=2 # Seconds between queue polls
QUEUE_FALLBACK_POLLING_INTERVAL=30 # Fallback when NOTIFY fails
# Task settings
QUEUE_TASK_TIMEOUT=300 # Max seconds per task
QUEUE_MAX_RETRIES=3 # Max retry attempts
# Database settings
QUEUE_USE_POSTGRESQL_NOTIFY=true # Enable PostgreSQL NOTIFY/LISTEN
QUEUE_NOTIFY_CHANNEL=queue_new_task # PostgreSQL notification channel
Runtime Configuration¶
from fastedgy.dependencies import get_service
from fastedgy.queued_task import QueuedTaskConfig
# Modify settings at runtime
config = get_service(QueuedTaskConfig)
config.max_workers = 8
config.worker_idle_timeout = 120
config.task_timeout = 600
Database Schema¶
Core Tables¶
-- Tasks
CREATE TABLE queued_tasks (
id UUID PRIMARY KEY,
name VARCHAR(255) NOT NULL,
state VARCHAR(50) NOT NULL,
parent_task UUID REFERENCES queued_tasks(id),
date_enqueued TIMESTAMP,
date_started TIMESTAMP,
date_done TIMESTAMP,
context JSONB,
result JSONB,
error_message TEXT
);
-- Task logs
CREATE TABLE queued_task_logs (
id SERIAL PRIMARY KEY,
queued_task_id UUID REFERENCES queued_tasks(id),
type VARCHAR(50),
message TEXT,
date_created TIMESTAMP
);
-- Workers
CREATE TABLE queued_task_workers (
id SERIAL PRIMARY KEY,
server_name VARCHAR(255),
max_workers INTEGER,
active_workers INTEGER,
idle_workers INTEGER,
is_running BOOLEAN,
last_heartbeat TIMESTAMP
);
Monitoring and Debugging¶
Database Queries¶
View task status:
-- All recent tasks
SELECT id, name, state, date_enqueued, date_started, date_done, parent_task
FROM queued_tasks
ORDER BY date_enqueued DESC
LIMIT 50;
-- Task logs
SELECT qt.name, qtl.type, qtl.message, qtl.date_created
FROM queued_task_logs qtl
JOIN queued_tasks qt ON qtl.queued_task_id = qt.id
ORDER BY qtl.date_created DESC
LIMIT 100;
-- Active workers
SELECT server_name, max_workers, active_workers, idle_workers, is_running, last_heartbeat
FROM queued_task_workers
WHERE is_running = true;
-- Task hierarchy
WITH RECURSIVE task_hierarchy AS (
SELECT id, name, parent_task, 1 as level, ARRAY[id] as path
FROM queued_tasks
WHERE parent_task IS NULL
UNION ALL
SELECT qt.id, qt.name, qt.parent_task, th.level + 1, th.path || qt.id
FROM queued_tasks qt
JOIN task_hierarchy th ON qt.parent_task = th.id
)
SELECT level, REPEAT(' ', level - 1) || name as indented_name, id, path
FROM task_hierarchy
ORDER BY path;
Performance Monitoring¶
# Custom monitoring endpoint
@router.get("/admin/queue-metrics")
async def get_queue_metrics():
# Query database for metrics
return {
"pending_tasks": await count_pending_tasks(),
"running_tasks": await count_running_tasks(),
"failed_tasks_today": await count_failed_tasks_today(),
"average_task_duration": await get_average_task_duration(),
"active_workers": await count_active_workers()
}
Debug Mode¶
Enable detailed logging:
import logging
# Enable debug logging
logging.getLogger('queued_task').setLevel(logging.DEBUG)
# This shows task creation, pickup, execution steps
Troubleshooting¶
Common Issues¶
Tasks stuck in "enqueued" state: - Check workers are running: fastedgy queue status
- Start workers: fastedgy queue start --workers=3
- Check database connectivity
Tasks failing silently: - Verify function imports available to workers - Check task function signatures match - Review worker logs for Python errors
High memory usage: - Reduce QUEUE_MAX_WORKERS
- Increase QUEUE_WORKER_IDLE_TIMEOUT
- Check for memory leaks in task functions
Database connection issues: - Verify PostgreSQL NOTIFY/LISTEN support - Set QUEUE_USE_POSTGRESQL_NOTIFY=false
to disable - Check database connection pool settings
Diagnostic Commands¶
# Check worker processes
ps aux | grep "fastedgy queue"
# Check database connections
# (Run in PostgreSQL)
SELECT state, count(*) FROM pg_stat_activity
WHERE datname = 'your_database'
GROUP BY state;
# Check notification channels
SELECT * FROM pg_stat_activity WHERE query LIKE '%LISTEN%';
Performance Tuning¶
Optimization Tips¶
- Worker Scaling: Match worker count to workload and CPU cores
- Batch Processing: Group small tasks into batches
- Connection Pooling: Ensure proper database connection pool configuration
- Task Granularity: Balance task size - not too small, not too large
- Context Usage: Use
auto_commit=False
for frequent updates
Database Indexing¶
Optimize task queries:
-- Index for task state queries
CREATE INDEX idx_queued_tasks_state ON queued_tasks(state);
-- Index for date-based queries
CREATE INDEX idx_queued_tasks_date_enqueued ON queued_tasks(date_enqueued);
-- Index for parent-child relationships
CREATE INDEX idx_queued_tasks_parent ON queued_tasks(parent_task);
Production Deployment¶
Multi-Server Setup¶
Deploy across multiple servers:
# Server 1: HTTP + Workers
fastedgy serve --workers=4
# Server 2: Workers only
fastedgy serve --workers=8 --no-http
# Server 3: Workers only
fastedgy serve --workers=8 --no-http
Health Checks¶
Monitor queue health:
@router.get("/health/queue")
async def queue_health():
try:
# Check database connectivity
await test_db_connection()
# Check if workers are responding
active_workers = await count_active_workers()
# Check for stuck tasks
stuck_tasks = await count_stuck_tasks()
if active_workers == 0:
return {"status": "unhealthy", "reason": "no_active_workers"}
if stuck_tasks > 10:
return {"status": "degraded", "reason": "tasks_stuck"}
return {"status": "healthy", "workers": active_workers}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
Backup and Recovery¶
Task data backup:
# Backup task tables
pg_dump -t queued_tasks -t queued_task_logs -t queued_task_workers your_db > queue_backup.sql
# Restore
psql your_db < queue_backup.sql
Integration Patterns¶
With Container Service¶
Tasks can use dependency injection:
from fastedgy.dependencies import get_service
async def service_using_task(data_id: int):
# Get services within task
email_service = get_service(EmailService)
db_service = get_service(DatabaseService)
# Use services
data = await db_service.get_data(data_id)
result = await email_service.send_report(data)
return result
With FastAPI Middleware¶
Custom middleware for task context:
@app.middleware("http")
async def task_context_middleware(request: Request, call_next):
# Set context for tasks triggered by this request
request_id = str(uuid.uuid4())
# Store in request state for access by endpoints
request.state.request_id = request_id
response = await call_next(request)
return response
Security Considerations¶
Task Isolation¶
Tasks run in the same process as workers, so:
- Validate all task inputs
- Sanitize data before processing
- Use resource limits to prevent abuse
- Consider sandboxing for untrusted code
Access Control¶
# Restrict queue management endpoints
@router.post("/admin/queue/clear")
async def clear_queue(current_user: User = Depends(get_admin_user)):
# Only admins can manage queue
clear_all_tasks()
return {"cleared": True}
Migration Guide¶
From Other Task Queues¶
Migrating from Celery or similar:
# Celery style
@celery_app.task
def old_task(data):
return process_data(data)
# FastEdgy style
async def new_task(data):
return await process_data(data)
# Usage
# old_task.delay(data) # Celery
tasks.add_task(new_task, data) # FastEdgy
Quick Reference¶
CLI Commands¶
fastedgy queue status # Check status
fastedgy queue start -w 4 # Start workers
fastedgy queue clear # Clear tasks (dev)
Key Environment Variables¶
QUEUE_MAX_WORKERS=4 # Worker count
QUEUE_TASK_TIMEOUT=300 # Task timeout
QUEUE_MAX_RETRIES=3 # Retry count