Advanced Queued Tasks Usage¶
Advanced patterns for complex applications including hooks, lifecycle management, and sophisticated task scenarios.
Task Lifecycle Hooks¶
Extend task behavior with hooks:
from fastedgy.queued_task import on_pre_create, on_pre_run, on_post_run
@on_pre_create
async def capture_context(task) -> None:
"""Run before task is saved to database"""
current_workspace = get_current_workspace()
task.context.update({'workspace_id': current_workspace.id})
@on_pre_run
async def setup_task(task) -> None:
"""Run before task execution"""
workspace_id = task.context.get('workspace_id')
if workspace_id:
setup_workspace_context(workspace_id)
@on_post_run
async def cleanup_task(task, result=None, error=None) -> None:
"""Run after task execution (success or failure)"""
cleanup_workspace_context()
Task Control and References¶
Advanced task management:
from fastedgy.queued_tasks import QueuedTasks
import asyncio
@router.post("/controlled-tasks")
async def manage_tasks(tasks: QueuedTasks = Inject(QueuedTasks)):
# Create task
task_ref = tasks.add_task(long_running_process, data)
# Get task ID
task_id = await task_ref.get_task_id()
# Check state
state = await task_ref.get_state()
# Cancel if needed
if some_condition:
task_ref.cancel()
return {"cancelled": True}
# Wait for completion
try:
result = await task_ref.wait()
return {"result": result}
except asyncio.CancelledError:
return {"cancelled": True}
Local Functions¶
Queue locally defined functions:
@router.post("/dynamic-task")
async def create_dynamic_task(config: dict, tasks: QueuedTasks = Inject(QueuedTasks)):
# Define function based on request
async def dynamic_processing(items: list):
for item in items:
if config.get('encrypt'):
item = encrypt_item(item)
await process_item(item)
return {"processed": len(items)}
# Queue the local function - it will be serialized automatically
task_ref = tasks.add_task(dynamic_processing, config['items'])
return {"task_id": await task_ref.get_task_id()}
Complex Context Management¶
Detailed progress tracking:
from fastedgy.queued_task import set_context, get_context, getLogger
async def complex_processing(dataset_id: int):
logger = getLogger('data.processing')
# Set initial context
set_context('dataset.id', dataset_id)
set_context('progress', 0)
set_context('step', 'validation')
logger.info(f"Starting processing for dataset {dataset_id}")
# Validation
await validate_dataset(dataset_id)
set_context('validation.status', 'completed')
set_context('progress', 25)
# Processing
set_context('step', 'processing')
results = await process_data(dataset_id)
set_context('processing.records', len(results))
set_context('progress', 75)
# Completion
set_context('step', 'completed')
set_context('progress', 100)
return {"success": True, "records": len(results)}
Batch Processing¶
Efficient large dataset handling:
async def process_user_batch(user_ids: list[int]):
set_context('batch.total', len(user_ids))
set_context('batch.processed', 0)
for i, user_id in enumerate(user_ids):
await process_single_user(user_id)
# Update progress every 10 users
if i % 10 == 0:
set_context('batch.processed', i + 1)
progress = ((i + 1) / len(user_ids)) * 100
set_context('batch.progress_percent', round(progress, 2))
return {"processed": len(user_ids)}
Advanced Dependencies¶
Complex task relationships:
async def create_workflow(project_id: int, tasks: QueuedTasks = Inject(QueuedTasks)):
# Step 1: Prepare data
prepare_task = tasks.add_task(prepare_data, project_id)
# Step 2: Parallel processing
analyze_task = tasks.add_task(analyze_data, project_id, parent=prepare_task)
transform_task = tasks.add_task(transform_data, project_id, parent=prepare_task)
# Step 3: Combine results
combine_task = tasks.add_task(combine_results, project_id, parent=analyze_task)
return {"workflow_started": True, "final_task": await combine_task.get_task_id()}
Error Recovery¶
Advanced error handling:
async def resilient_task(primary_source: str, fallback_source: str):
set_context('strategy', 'primary_with_fallback')
try:
set_context('current_source', primary_source)
result = await process_from_primary(primary_source)
set_context('source_used', 'primary')
return result
except PrimarySourceError as e:
set_context('primary_error', str(e))
# Try fallback
try:
set_context('current_source', fallback_source)
result = await process_from_fallback(fallback_source)
set_context('source_used', 'fallback')
return result
except Exception as fallback_error:
set_context('all_sources_failed', True)
raise Exception(f"All sources failed: {e}, {fallback_error}")
Performance Optimization¶
Resource management:
async def resource_intensive_task(data_id: int):
# Use context managers for expensive resources
async with get_db_connection() as conn:
set_context('connection.acquired', True)
data = await conn.fetch("SELECT * FROM data WHERE id = $1", data_id)
result = await process_data(data)
set_context('records.processed', len(result))
return result
Testing Advanced Patterns¶
Mock complex scenarios:
import pytest
from unittest.mock import AsyncMock
@pytest.fixture
async def mock_tasks():
mock = AsyncMock()
mock_ref = AsyncMock()
mock_ref.get_task_id.return_value = "test-123"
mock.add_task.return_value = mock_ref
register_service(mock, QueuedTasks, force=True)
yield mock
unregister_service(QueuedTasks)
@pytest.mark.asyncio
async def test_workflow(mock_tasks):
result = await create_workflow(123, mock_tasks)
assert mock_tasks.add_task.call_count == 4
assert result["workflow_started"] is True
Next Steps¶
- Technical Details → - CLI, monitoring, architecture
- Getting Started ← - Back to basics
- User Guide ← - Everyday patterns