Skip to content

Getting Started with Queued Tasks

Set up background task processing in your FastAPI application in 5 minutes.

Prerequisites

  • FastAPI application with FastEdgy
  • PostgreSQL database (for task storage)
  • Basic understanding of async/await

Step 1: Setup Models and Database

Create the queued task models in your project's models/ directory:

models/queued_task.py:

from fastedgy.models.queued_task import BaseQueuedTask
from fastedgy.i18n import _t


class QueuedTask(BaseQueuedTask):
    class Meta:  # type: ignore
        tablename = "queued_tasks"
        label = _t("Tâche en file d'attente")
        label_plural = _t("Tâches en file d'attente")
        default_order_by = [("created_at", "desc")]

models/queued_task_log.py:

from fastedgy.models.queued_task_log import BaseQueuedTaskLog
from fastedgy.i18n import _t


class QueuedTaskLog(BaseQueuedTaskLog):
    class Meta:  # type: ignore
        tablename = "queued_task_logs"
        label = _t("Log de la tâche en file d'attente")
        label_plural = _t("Logs de la tâche en file d'attente")
        default_order_by = [("created_at", "desc")]

models/queued_task_worker.py:

from fastedgy.models.queued_task_worker import BaseQueuedTaskWorker
from fastedgy.i18n import _t


class QueuedTaskWorker(BaseQueuedTaskWorker):
    class Meta:  # type: ignore
        tablename = "queued_task_workers"
        label = _t("Worker de la tâche en file d'attente")
        label_plural = _t("Workers de la tâche en file d'attente")
        default_order_by = [("created_at", "desc")]

models/__init__.py:

from models.queued_task import QueuedTask as QueuedTask
from models.queued_task_log import QueuedTaskLog as QueuedTaskLog
from models.queued_task_worker import QueuedTaskWorker as QueuedTaskWorker

Then create and apply the database migration:

# Create migration for task tables
fastedgy db makemigrations -m "Add queued task models"

# Apply the migration
fastedgy db migrate

This creates tables to store tasks, workers, and logs.

Step 2: Your First Background Task

Create a simple async function to run in the background:

# tasks.py
async def send_welcome_email(user_email: str, username: str):
    """Send a welcome email to new users."""
    # Your email sending logic here
    print(f"Sending welcome email to {user_email} (user: {username})")

    # Simulate email processing time
    await asyncio.sleep(2)

    return {"email_sent": True, "recipient": user_email}

Step 3: Queue Tasks in Your API

Use the task in your FastAPI endpoints:

# main.py
from fastapi import APIRouter
from fastedgy.app import FastEdgy
from fastedgy.dependencies import Inject
from fastedgy.queued_tasks import QueuedTasks
from tasks import send_welcome_email

app = FastEdgy()
router = APIRouter()

@router.post("/users/register")
async def register_user(
    email: str,
    username: str,
    tasks: QueuedTasks = Inject(QueuedTasks)  # Inject the task queue
):
    # Create user in database (fast operation)
    user = {"id": 123, "email": email, "username": username}

    # Queue email sending (slow operation) - runs in background
    email_task = tasks.add_task(
        send_welcome_email,
        user["email"],
        user["username"]
    )

    # Return immediately - user doesn't wait for email
    return {
        "user": user,
        "email_task_queued": True
    }

app.include_router(router)

That's it! Your task is now queued and will be executed by background workers.

Step 4: Start Workers

In a separate terminal, start background workers to process tasks:

# Start 2 background workers
fastedgy queue start --workers=2

You'll see output like:

Starting 2 workers...
Worker 1 ready
Worker 2 ready
Listening for tasks...

Step 5: Test It!

Make a request to your endpoint:

curl -X POST "http://localhost:8000/users/register" \
  -H "Content-Type: application/json" \
  -d '{"email": "user@example.com", "username": "johndoe"}'

Response (immediate):

{
  "user": {"id": 123, "email": "user@example.com", "username": "johndoe"},
  "email_task_queued": true
}

Worker output (2 seconds later):

Sending welcome email to user@example.com (user: johndoe)
Task completed: send_welcome_email

What Just Happened?

  1. User registration completed instantly (database operations)
  2. Email task was queued in PostgreSQL
  3. Background worker picked up the task
  4. Email sent without blocking the user's request

Task Dependencies

Tasks can wait for other tasks to complete:

@router.post("/orders")
async def process_order(
    order_data: dict,
    tasks: QueuedTasks = Inject(QueuedTasks)
):
    # Process order first
    process_task = tasks.add_task(process_payment, order_data["payment_info"])

    # These tasks wait for payment processing to complete
    email_task = tasks.add_task(
        send_order_confirmation,
        order_data["user_email"],
        parent=process_task  # Waits for process_task
    )

    inventory_task = tasks.add_task(
        update_inventory,
        order_data["items"],
        parent=process_task  # Also waits for process_task
    )

    return {"order_queued": True}

Checking Task Status

Get information about your tasks:

@router.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
    # You can check task status, but this requires storing task IDs
    # See the User Guide for complete examples
    pass

Common Patterns

Pattern 1: Fire and Forget

# Just queue it - don't need the result
tasks.add_task(send_notification, user_id, "Welcome!")

Pattern 2: Get Task Reference

# Get task ID to track later
task_ref = tasks.add_task(generate_report, user_id)
task_id = await task_ref.get_task_id()
return {"task_id": task_id}
# Process multiple items in parallel
for item in items:
    tasks.add_task(process_item, item.id)

Environment Variables (Optional)

Customize worker behavior:

# .env
QUEUE_MAX_WORKERS=4              # Default: CPU count
QUEUE_WORKER_IDLE_TIMEOUT=60     # Shutdown idle workers after 60s
QUEUE_TASK_TIMEOUT=300           # Max 5 minutes per task

Basic CLI Commands

# Check queue status
fastedgy queue status

# Start workers
fastedgy queue start --workers=3

# Clear all pending tasks (development only)
fastedgy queue clear

Next Steps

You now know the basics! For more advanced features:

Troubleshooting

Tasks not running? - Check workers are started: fastedgy queue status - Check database connection in worker logs

Tasks failing? - Check function imports are available to workers - Verify async function syntax

Performance issues? - Increase worker count: fastedgy queue start --workers=6 - Check database connection pool settings

Quick Reference

from fastedgy.dependencies import Inject
from fastedgy.queued_tasks import QueuedTasks

# Basic usage
tasks: QueuedTasks = Inject(QueuedTasks)

# Queue a task (fire and forget)
tasks.add_task(my_function, arg1, arg2)

# Queue with parent dependency
child_task = tasks.add_task(child_function, parent=parent_task)

# Get task ID for tracking
task_ref = tasks.add_task(my_function, arg)
task_id = await task_ref.get_task_id()

Ready for more advanced patterns? Continue to User Guide →