Back to Skills

background-job-processing

aj-geddes
Updated Today
26 views
7
7
View on GitHub
Metaaidata

About

This skill implements background job processing systems using task queues and worker pools for handling asynchronous operations. It enables scheduling, retry mechanisms, and distributed processing of long-running tasks like email sending, report generation, and large dataset processing. Developers should use it to offload time-consuming operations from main application threads.

Documentation

Background Job Processing

Overview

Build robust background job processing systems with distributed task queues, worker pools, job scheduling, error handling, retry policies, and monitoring for efficient asynchronous task execution.

When to Use

  • Handling long-running operations asynchronously
  • Sending emails in background
  • Generating reports or exports
  • Processing large datasets
  • Scheduling recurring tasks
  • Distributing compute-intensive operations

Instructions

1. Python with Celery and Redis

# celery_app.py
from celery import Celery
from kombu import Exchange, Queue
import os

app = Celery('myapp')

# Configuration
app.conf.update(
    broker_url=os.getenv('REDIS_URL', 'redis://localhost:6379/0'),
    result_backend=os.getenv('REDIS_URL', 'redis://localhost:6379/0'),
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
    task_track_started=True,
    task_time_limit=30 * 60,  # 30 minutes
    task_soft_time_limit=25 * 60,  # 25 minutes
    broker_connection_retry_on_startup=True,
)

# Queue configuration
default_exchange = Exchange('tasks', type='direct')
app.conf.task_queues = (
    Queue('default', exchange=default_exchange, routing_key='default'),
    Queue('emails', exchange=default_exchange, routing_key='emails'),
    Queue('reports', exchange=default_exchange, routing_key='reports'),
    Queue('batch', exchange=default_exchange, routing_key='batch'),
)

app.conf.task_routes = {
    'tasks.send_email': {'queue': 'emails'},
    'tasks.generate_report': {'queue': 'reports'},
    'tasks.process_batch': {'queue': 'batch'},
}

app.conf.task_default_retry_delay = 60
app.conf.task_max_retries = 3

# Auto-discover tasks
app.autodiscover_tasks(['myapp.tasks'])

# tasks.py
from celery_app import app
from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded
import logging

logger = logging.getLogger(__name__)

@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def send_email(self, user_id, email_subject):
    """Send email task with retry logic"""
    try:
        user = User.query.get(user_id)
        if not user:
            logger.error(f"User {user_id} not found")
            return {'status': 'failed', 'reason': 'User not found'}

        # Send email logic
        send_email_helper(user.email, email_subject)

        return {'status': 'success', 'user_id': user_id}

    except Exception as exc:
        logger.error(f"Error sending email: {exc}")
        # Retry with exponential backoff
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

@shared_task(bind=True)
def generate_report(self, report_type, filters):
    """Generate report with progress tracking"""
    try:
        self.update_state(
            state='PROGRESS',
            meta={'current': 0, 'total': 100, 'status': 'Initializing...'}
        )

        total_records = count_records(filters)
        processed = 0

        for batch in fetch_records_in_batches(filters, batch_size=1000):
            process_batch(batch, report_type)
            processed += len(batch)

            # Update progress
            progress = int((processed / total_records) * 100)
            self.update_state(
                state='PROGRESS',
                meta={'current': processed, 'total': total_records, 'progress': progress}
            )

        return {'status': 'success', 'total_records': total_records}

    except SoftTimeLimitExceeded:
        logger.error("Report generation exceeded time limit")
        raise Exception("Report generation timed out")

@shared_task(bind=True)
def process_batch(self, batch_data):
    """Process large batch operations"""
    results = []
    for item in batch_data:
        try:
            result = process_item(item)
            results.append(result)
        except Exception as e:
            logger.error(f"Error processing item {item}: {e}")
            results.append({'status': 'failed', 'error': str(e)})

    return {'processed': len(results), 'results': results}

# Periodic tasks with Beat scheduler
from celery.schedules import crontab

app.conf.beat_schedule = {
    'cleanup-expired-sessions': {
        'task': 'tasks.cleanup_expired_sessions',
        'schedule': crontab(minute=0, hour='*/6'),  # Every 6 hours
        'args': ()
    },
    'generate-daily-report': {
        'task': 'tasks.generate_daily_report',
        'schedule': crontab(hour=0, minute=0),  # Daily at midnight
        'args': ()
    },
    'sync-external-data': {
        'task': 'tasks.sync_external_data',
        'schedule': crontab(minute=0),  # Every hour
        'args': ()
    },
}

@shared_task
def cleanup_expired_sessions():
    """Cleanup expired sessions"""
    deleted_count = Session.query.filter(
        Session.expires_at < datetime.utcnow()
    ).delete()
    db.session.commit()
    return {'deleted': deleted_count}

@shared_task
def sync_external_data():
    """Sync data from external API"""
    try:
        data = fetch_from_external_api()
        for item in data:
            update_or_create_record(item)
        return {'status': 'success', 'synced_items': len(data)}
    except Exception as e:
        logger.error(f"Sync failed: {e}")
        raise

# Flask integration
from flask import Blueprint, jsonify

celery_bp = Blueprint('celery', __name__, url_prefix='/api/tasks')

@celery_bp.route('/<task_id>/status', methods=['GET'])
def task_status(task_id):
    """Get task status"""
    result = app.AsyncResult(task_id)
    return jsonify({
        'task_id': task_id,
        'status': result.status,
        'result': result.result if result.ready() else None,
        'progress': result.info if result.state == 'PROGRESS' else None
    })

@celery_bp.route('/send-email', methods=['POST'])
def trigger_email():
    """Trigger email sending task"""
    data = request.json
    task = send_email.delay(data['user_id'], data['subject'])
    return jsonify({'task_id': task.id}), 202

2. Node.js with Bull Queue

// queue.js
const Queue = require('bull');
const redis = require('redis');

const redisClient = redis.createClient({
    host: process.env.REDIS_HOST || 'localhost',
    port: process.env.REDIS_PORT || 6379
});

// Create job queues
const emailQueue = new Queue('emails', {
    redis: {
        host: process.env.REDIS_HOST || 'localhost',
        port: process.env.REDIS_PORT || 6379
    }
});

const reportQueue = new Queue('reports', {
    redis: {
        host: process.env.REDIS_HOST || 'localhost',
        port: process.env.REDIS_PORT || 6379
    }
});

const batchQueue = new Queue('batch', {
    redis: {
        host: process.env.REDIS_HOST || 'localhost',
        port: process.env.REDIS_PORT || 6379
    }
});

// Process email jobs
emailQueue.process(5, async (job) => {
    const { userId, subject, body } = job.data;

    try {
        const user = await User.findById(userId);
        if (!user) {
            throw new Error(`User ${userId} not found`);
        }

        await sendEmailHelper(user.email, subject, body);

        return { status: 'success', userId };
    } catch (error) {
        // Retry with exponential backoff
        throw error;
    }
});

// Process report jobs with progress
reportQueue.process(async (job) => {
    const { reportType, filters } = job.data;
    const totalRecords = await countRecords(filters);

    for (let i = 0; i < totalRecords; i += 1000) {
        const batch = await fetchRecordsBatch(filters, i, 1000);
        await processBatch(batch, reportType);

        // Update progress
        job.progress(Math.round((i / totalRecords) * 100));
    }

    return { status: 'success', totalRecords };
});

// Process batch jobs
batchQueue.process(async (job) => {
    const { items } = job.data;
    const results = [];

    for (const item of items) {
        try {
            const result = await processItem(item);
            results.push(result);
        } catch (error) {
            results.push({ status: 'failed', error: error.message });
        }
    }

    return { processed: results.length, results };
});

// Event listeners
emailQueue.on('completed', (job) => {
    console.log(`Email job ${job.id} completed`);
});

emailQueue.on('failed', (job, err) => {
    console.error(`Email job ${job.id} failed:`, err.message);
});

emailQueue.on('progress', (job, progress) => {
    console.log(`Email job ${job.id} ${progress}% complete`);
});

module.exports = {
    emailQueue,
    reportQueue,
    batchQueue
};

// routes.js
const express = require('express');
const { emailQueue, reportQueue } = require('./queue');

const router = express.Router();

// Trigger email job
router.post('/send-email', async (req, res) => {
    const { userId, subject, body } = req.body;

    const job = await emailQueue.add(
        { userId, subject, body },
        {
            attempts: 3,
            backoff: {
                type: 'exponential',
                delay: 2000
            },
            removeOnComplete: true
        }
    );

    res.status(202).json({ jobId: job.id });
});

// Get job status
router.get('/jobs/:jobId/status', async (req, res) => {
    const job = await emailQueue.getJob(req.params.jobId);

    if (!job) {
        return res.status(404).json({ error: 'Job not found' });
    }

    const progress = await job.progress();
    const state = await job.getState();
    const attempts = job.attemptsMade;

    res.json({
        jobId: job.id,
        state,
        progress,
        attempts,
        data: job.data
    });
});

module.exports = router;

3. Ruby with Sidekiq

# Gemfile
gem 'sidekiq', '~> 7.0'
gem 'redis'
gem 'sidekiq-scheduler'

# config/sidekiq.yml
---
:redis:
  :url: redis://localhost:6379/0
:concurrency: 5
:timeout: 25
:max_retries: 3
:dead_letter_queue:
  :enabled: true
  :queue_name: dead_letter_queue

# app/workers/email_worker.rb
class EmailWorker
  include Sidekiq::Worker
  sidekiq_options queue: 'emails', retry: 3, lock: :until_executed

  def perform(user_id, subject)
    user = User.find(user_id)
    UserMailer.send_email(user, subject).deliver_now

    logger.info "Email sent to user #{user_id}"
  rescue StandardError => e
    logger.error "Failed to send email: #{e.message}"
    raise
  end
end

# app/workers/report_worker.rb
class ReportWorker
  include Sidekiq::Worker
  sidekiq_options queue: 'reports', retry: 2

  def perform(report_type, filters)
    total_records = Record.filter_by(filters).count
    processed = 0

    Record.filter_by(filters).find_in_batches(batch_size: 1000) do |batch|
      process_batch(batch, report_type)
      processed += batch.size

      # Update progress
      Sidekiq.redis { |conn|
        conn.hset("job:#{jid}", 'progress', (processed.to_f / total_records * 100).round(2))
      }
    end

    logger.info "Report #{report_type} generated"
    { status: 'success', total_records: total_records }
  end
end

# app/controllers/tasks_controller.rb
class TasksController < ApplicationController
  def send_email
    user_id = params[:user_id]
    subject = params[:subject]

    job_id = EmailWorker.perform_async(user_id, subject)
    render json: { job_id: job_id }, status: :accepted
  end

  def job_status
    job_id = params[:job_id]
    status = Sidekiq::Status.get(job_id)

    render json: {
      job_id: job_id,
      status: status || 'not_found'
    }
  end
end

# Scheduled jobs (lib/tasks/scheduler.rake or config/sidekiq.yml)
sidekiq_scheduler:
  cleanup_expired_sessions:
    cron: '0 */6 * * *'
    class: CleanupSessionsWorker
  generate_daily_report:
    cron: '0 0 * * *'
    class: DailyReportWorker

4. Job Retry and Error Handling

# Retry strategies
from celery import shared_task
from celery.exceptions import MaxRetriesExceededError
import logging
import random

logger = logging.getLogger(__name__)

@shared_task(bind=True, max_retries=5, autoretry_for=(Exception,))
def resilient_task(self, data):
    """Task with advanced retry logic"""
    try:
        # Attempt task
        result = perform_operation(data)
        return result

    except TemporaryError as exc:
        # Retry with exponential backoff
        retry_delay = min(2 ** self.request.retries * 60, 3600)
        raise self.retry(exc=exc, countdown=retry_delay)

    except PermanentError as exc:
        logger.error(f"Permanent error in task {self.request.id}: {exc}")
        # Don't retry, just log and fail
        return {'status': 'failed', 'error': str(exc)}

    except Exception as exc:
        if self.request.retries < self.max_retries:
            logger.warning(f"Retrying task {self.request.id}, attempt {self.request.retries + 1}")
            # Add jitter to prevent thundering herd
            jitter = random.uniform(0, 10)
            raise self.retry(exc=exc, countdown=60 + jitter)
        else:
            raise MaxRetriesExceededError(f"Task {self.request.id} failed after {self.max_retries} retries")

5. Monitoring and Observability

# monitoring.py
from prometheus_client import Counter, Histogram, Gauge
import time

# Metrics
task_counter = Counter('celery_task_total', 'Total tasks', ['task_name', 'status'])
task_duration = Histogram('celery_task_duration_seconds', 'Task duration', ['task_name'])
task_queue_size = Gauge('celery_queue_size', 'Queue size', ['queue_name'])

def track_task_metrics(task_name):
    def decorator(func):
        def wrapper(*args, **kwargs):
            start_time = time.time()
            try:
                result = func(*args, **kwargs)
                task_counter.labels(task_name=task_name, status='success').inc()
                return result
            except Exception as e:
                task_counter.labels(task_name=task_name, status='failed').inc()
                raise
            finally:
                duration = time.time() - start_time
                task_duration.labels(task_name=task_name).observe(duration)
        return wrapper
    return decorator

@shared_task
@track_task_metrics('send_email')
def send_email_tracked(user_id, subject):
    # Task implementation
    pass

Best Practices

✅ DO

  • Use task timeouts to prevent hanging jobs
  • Implement retry logic with exponential backoff
  • Make tasks idempotent
  • Use job priorities for critical tasks
  • Monitor queue depths and job failures
  • Log job execution details
  • Clean up completed jobs
  • Set appropriate batch sizes for memory efficiency
  • Use dead-letter queues for failed jobs
  • Test jobs independently

❌ DON'T

  • Use synchronous operations in async tasks
  • Ignore job failures
  • Make tasks dependent on external state
  • Use unbounded retries
  • Store large objects in job data
  • Forget to handle timeouts
  • Run jobs without monitoring
  • Use blocking operations in queues
  • Forget to track job progress
  • Mix unrelated operations in one job

Complete Example

from celery import shared_task
from celery_app import app

@shared_task
def simple_task(x, y):
    return x + y

# Trigger task
result = simple_task.delay(4, 6)
print(result.get())  # 10

Quick Install

/plugin add https://github.com/aj-geddes/useful-ai-prompts/tree/main/background-job-processing

Copy and paste this command in Claude Code to install this skill

GitHub 仓库

aj-geddes/useful-ai-prompts
Path: skills/background-job-processing

Related Skills

sglang

Meta

SGLang is a high-performance LLM serving framework that specializes in fast, structured generation for JSON, regex, and agentic workflows using its RadixAttention prefix caching. It delivers significantly faster inference, especially for tasks with repeated prefixes, making it ideal for complex, structured outputs and multi-turn conversations. Choose SGLang over alternatives like vLLM when you need constrained decoding or are building applications with extensive prefix sharing.

View skill

evaluating-llms-harness

Testing

This Claude Skill runs the lm-evaluation-harness to benchmark LLMs across 60+ standardized academic tasks like MMLU and GSM8K. It's designed for developers to compare model quality, track training progress, or report academic results. The tool supports various backends including HuggingFace and vLLM models.

View skill

llamaguard

Other

LlamaGuard is Meta's 7-8B parameter model for moderating LLM inputs and outputs across six safety categories like violence and hate speech. It offers 94-95% accuracy and can be deployed using vLLM, Hugging Face, or Amazon SageMaker. Use this skill to easily integrate content filtering and safety guardrails into your AI applications.

View skill

langchain

Meta

LangChain is a framework for building LLM applications using agents, chains, and RAG pipelines. It supports multiple LLM providers, offers 500+ integrations, and includes features like tool calling and memory management. Use it for rapid prototyping and deploying production systems like chatbots, autonomous agents, and question-answering services.

View skill