Queues & Background Jobs

Ferro provides a database-backed queue for processing jobs asynchronously. The queue uses the application's existing DatabaseConnection — no separate external queue server is needed. The WorkerLoop runs in-process inside Application::run and is started automatically when at least one job type is registered.

Atomic claim is dual-backend:

  • PostgresSELECT … FOR UPDATE SKIP LOCKED inside a transaction
  • SQLite — raw BEGIN IMMEDIATE + UPDATE … RETURNING

Both paths claim exactly one job per cycle; two workers on the same table cannot double-claim a row.

Setup

Migration

Register CreateJobsTable in your application's Migrator alongside your own migrations:

#![allow(unused)]
fn main() {
use ferro_queue::CreateJobsTable;
use sea_orm_migration::prelude::*;

pub struct Migrator;

#[async_trait::async_trait]
impl MigratorTrait for Migrator {
    fn migrations() -> Vec<Box<dyn MigrationTrait>> {
        vec![
            Box::new(CreateJobsTable),
            // ... your own migrations
        ]
    }
}
}

Registration

Register job types in your bootstrap before the server starts. The framework's server boot path (inside Application::run) detects registered job types and spawns a WorkerLoop automatically — no separate process or CLI command required.

#![allow(unused)]
fn main() {
// src/bootstrap.rs
use ferro::queue::Queue;
use crate::jobs::{ProcessPayment, SendEmail, GenerateReport};

pub async fn register() {
    // Register job types — the framework auto-starts the WorkerLoop.
    Queue::register::<ProcessPayment>();
    Queue::register::<SendEmail>();
    Queue::register::<GenerateReport>();
}
}

Environment Variables

# Queue driver: "sync" for development (jobs run inline), any other value for background.
# IMPORTANT: when QUEUE_CONNECTION is UNSET it defaults to "sync" — background
# processing is OFF unless you set this to a non-sync value (e.g. "db").
QUEUE_CONNECTION=db

# Default queue name
QUEUE_DEFAULT=default

# Maximum concurrent jobs per worker instance
QUEUE_MAX_CONCURRENT=10

QUEUE_CONNECTION defaults to sync when unset. In sync mode jobs run inline during the HTTP request — no background worker, no database polling — and .delay() / .on_queue() are ignored. Set any other value (e.g. db) to enable background processing. If jobs are registered while the queue is in sync mode, the server logs a startup warning, since this combination is usually unintended in production.

Creating Jobs

Using the CLI

ferro make:job ProcessPayment

This creates src/jobs/process_payment.rs:

#![allow(unused)]
fn main() {
use ferro::queue::{Job, Error, async_trait};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProcessPayment {
    pub order_id: i64,
    pub amount: f64,
}

#[async_trait]
impl Job for ProcessPayment {
    async fn handle(&self) -> Result<(), Error> {
        tracing::info!("Processing payment for order {}", self.order_id);
        // Payment processing logic...
        Ok(())
    }

    fn max_retries(&self) -> u32 {
        3
    }
}
}

Job Trait Methods

MethodDescriptionDefault
handle()Job execution logicRequired
name()Job identifier for loggingType name
max_retries()Retry attempts on failure3
retry_delay(attempt)Delay before retryFull-jitter exponential (see below)
timeout()Maximum execution time60 seconds
failed(error)Called when all retries exhaustedLogs error
idempotency_key()Deduplication key on enqueueNone

Retry Delay Default

The default retry_delay uses full-jitter exponential backoff: rand(0..=min(cap, base × 2^attempt)) where base = 5 s and cap = 15 min. Override it on individual job types:

#![allow(unused)]
fn main() {
fn retry_delay(&self, attempt: u32) -> std::time::Duration {
    // Fixed 30-second delay regardless of attempt count.
    std::time::Duration::from_secs(30)
}
}

Idempotency Keys

Provide idempotency_key() to prevent duplicate jobs when the same event fires more than once. Enqueue skips insertion when a pending or claimed row with the same (job_type, idempotency_key) already exists:

#![allow(unused)]
fn main() {
impl Job for SendInvoice {
    fn idempotency_key(&self) -> Option<String> {
        Some(format!("send-invoice-{}", self.invoice_id))
    }

    async fn handle(&self) -> Result<(), Error> {
        // Will only run once per invoice_id even if dispatched multiple times.
        Ok(())
    }
}
}

Dispatching Jobs

Basic Dispatch

#![allow(unused)]
fn main() {
use crate::jobs::ProcessPayment;

ProcessPayment {
    order_id: 123,
    amount: 99.99,
}
.dispatch()
.await?;
}

With Delay

#![allow(unused)]
fn main() {
use std::time::Duration;

ProcessPayment { order_id: 123, amount: 99.99 }
    .delay(Duration::from_secs(300))  // Run after 5 minutes
    .dispatch()
    .await?;
}

To Specific Queue

#![allow(unused)]
fn main() {
ProcessPayment { order_id: 123, amount: 99.99 }
    .on_queue("high-priority")
    .dispatch()
    .await?;
}

Combining Options

#![allow(unused)]
fn main() {
ProcessPayment { order_id: 123, amount: 99.99 }
    .delay(Duration::from_secs(60))
    .on_queue("payments")
    .dispatch()
    .await?;
}

WorkerLoop Configuration

The framework creates a WorkerLoop with WorkerConfig::default() when job types are registered. Override the configuration by calling WorkerLoop::new(config) directly if you need custom settings.

#![allow(unused)]
fn main() {
use ferro::queue::{WorkerConfig, WorkerLoop};
use std::time::Duration;

let config = WorkerConfig {
    queues: vec!["high-priority".into(), "default".into()],
    max_jobs: 20,
    sleep_duration: Duration::from_millis(500),
    visibility_timeout: Duration::from_secs(300), // 5 min default
    ..Default::default()
};
}
FieldDescriptionDefault
queuesQueue names to process, in priority order["default"]
max_jobsMaximum concurrent in-flight jobs10
sleep_durationIdle poll interval when queue is empty1s
visibility_timeoutTime before a claimed job is reclaimed by the reaper300s

CPU-Heavy Jobs

The WorkerLoop runs on the async executor. Jobs that do CPU-bound work (PDF rendering, image processing, compression) must wrap that work in tokio::task::spawn_blocking to avoid starving the executor of threads and blocking other jobs from running:

#![allow(unused)]
fn main() {
use ferro::queue::{Job, Error, async_trait};

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct RenderDocumentPdfJob {
    pub document_id: i64,
}

#[async_trait]
impl Job for RenderDocumentPdfJob {
    async fn handle(&self) -> Result<(), Error> {
        let document_id = self.document_id;

        // spawn_blocking moves CPU work off the async executor thread pool.
        tokio::task::spawn_blocking(move || {
            // CPU-intensive PDF rendering here...
            render_pdf(document_id)
        })
        .await
        .map_err(|e| Error::custom(format!("spawn_blocking join: {e}")))?
        .map_err(|e| Error::custom(format!("render_pdf: {e}")))
    }
}

fn render_pdf(_document_id: i64) -> Result<(), String> {
    // synchronous rendering work
    Ok(())
}
}

This applies to any job doing significant CPU work: document rendering, image resizing, compression, or large in-memory data transformations.

Error Handling

Automatic Retries

Failed jobs are automatically retried based on max_retries() and retry_delay(). After all retries are exhausted, the job is parked as failed with the error message recorded:

#![allow(unused)]
fn main() {
impl Job for ProcessPayment {
    fn max_retries(&self) -> u32 {
        5
    }

    async fn failed(&self, error: &Error) {
        tracing::error!(
            order_id = self.order_id,
            error = %error,
            "Payment processing permanently failed"
        );
        // Notify, update order status, etc.
    }
}
}

Stuck Job Reaper

The worker runs a reaper before each claim cycle. Jobs that have been claimed for longer than the visibility_timeout (default 5 min) are:

  • Reset to pending with attempts + 1 if they have retries remaining
  • Parked as failed if they have exhausted max_retries

This recovers from worker crashes without any manual intervention.

Graceful Shutdown

On SIGTERM or Ctrl-C the worker stops claiming new jobs, waits for in-flight jobs to finish, and resets any claimed rows it held back to pending — those jobs will be claimed by the next worker instance.

Failed Job Inspection

Failed jobs are stored in the jobs table with status = 'failed'. Inspect them via the debug endpoint or ferro-mcp:

# Debug endpoint (requires APP_ENV=local or DEBUG_MODE=true)
curl http://localhost:3000/_ferro/queue/stats
curl http://localhost:3000/_ferro/queue/jobs

Migration Guide

The following table maps the previous external-broker API to the current DB-backed API.

Old APINew (DB)Notes
Queue::init(QueueConfig::new(broker_url))Queue::register::<J>() in bootstrap; framework auto-initsConnection injected at bootstrap from the app DB
Separate worker process / cargo run --bin workerWorkerLoop auto-started inside Application::runSingle binary, work-stealing
External broker env vars (HOST, PORT, PASSWORD)None requiredQueue uses the app's DATABASE_URL
failed_jobs tablejobs WHERE status='failed'Single table, error recorded inline
2^attempt fixed backoffFull-jitter exponential defaultOverride via Job::retry_delay
No deduplication hookJob::idempotency_key()Dedup on enqueue when Some
QueueConnection typeRemovedQueue::connection() returns &DatabaseConnection

Gestiscilo Consumer Migration (Phase 188)

The following job types migrate in gestiscilo Phase 188. Each keeps its Job implementation unchanged; only the registration and migration registration change:

JobOld registrationNew registration
RenderDocumentPdfJobworker.register::<RenderDocumentPdfJob>() in worker binaryQueue::register::<RenderDocumentPdfJob>() in bootstrap
SendBookingReminderJobworker.register::<SendBookingReminderJob>() in worker binaryQueue::register::<SendBookingReminderJob>() in bootstrap
DeliverNotificationJobworker.register::<DeliverNotificationJob>() in worker binaryQueue::register::<DeliverNotificationJob>() in bootstrap
screenshot_workerseparate process binaryQueue::register::<ScreenshotJob>() in bootstrap

Add Box::new(ferro_queue::CreateJobsTable) to your Migrator::migrations() list (one-time migration). The failed_jobs table (if present) can be dropped after migration — failed job history is now in jobs WHERE status='failed'.

MCP Tools

Use these tools to monitor and debug queue state during development and in running applications.

list_jobs

Returns all Job implementations found in src/jobs/, including the job struct name, max retries, and timeout configuration. Use this to audit what jobs exist before dispatching or debugging failures.

job_history

Returns recent failed job history from jobs WHERE status='failed': job name, error message, attempt count, and timestamp. Use this to diagnose jobs that are permanently failing.

queue_status

Returns current queue depth and pending job counts per queue name. Use this to check whether a queue is backed up.