Back to Skills

batch-processing-jobs

aj-geddes
Updated Today
16 views
7
7
View on GitHub
Otherdata

About

This skill provides patterns for implementing robust batch processing systems using job queues, schedulers, and distributed workers. It's designed for handling large datasets, scheduled tasks, resource-intensive computations, and asynchronous operations. Key capabilities include scalable background task processing and efficient management of long-running jobs like ETL pipelines and bulk data updates.

Documentation

Batch Processing Jobs

Overview

Implement scalable batch processing systems for handling large-scale data processing, scheduled tasks, and async operations efficiently.

When to Use

  • Processing large datasets
  • Scheduled report generation
  • Email/notification campaigns
  • Data imports and exports
  • Image/video processing
  • ETL pipelines
  • Cleanup and maintenance tasks
  • Long-running computations
  • Bulk data updates

Architecture Patterns

┌─────────────┐      ┌─────────────┐      ┌──────────┐
│  Producer   │─────▶│    Queue    │─────▶│  Worker  │
└─────────────┘      └─────────────┘      └──────────┘
                           │                     │
                           │                     ▼
                           │              ┌──────────┐
                           └─────────────▶│  Result  │
                                         │  Storage │
                                         └──────────┘

Implementation Examples

1. Bull Queue (Node.js)

import Queue from 'bull';
import { v4 as uuidv4 } from 'uuid';

interface JobData {
  id: string;
  type: string;
  payload: any;
  userId?: string;
  metadata?: Record<string, any>;
}

interface JobResult {
  success: boolean;
  data?: any;
  error?: string;
  processedAt: number;
  duration: number;
}

class BatchProcessor {
  private queue: Queue.Queue<JobData>;
  private resultQueue: Queue.Queue<JobResult>;

  constructor(redisUrl: string) {
    // Main processing queue
    this.queue = new Queue('batch-jobs', redisUrl, {
      defaultJobOptions: {
        attempts: 3,
        backoff: {
          type: 'exponential',
          delay: 2000
        },
        removeOnComplete: 1000,
        removeOnFail: 5000,
        timeout: 300000 // 5 minutes
      },
      settings: {
        maxStalledCount: 2,
        stalledInterval: 30000
      }
    });

    // Results queue
    this.resultQueue = new Queue('batch-results', redisUrl);

    this.setupProcessors();
    this.setupEvents();
  }

  private setupProcessors(): void {
    // Data processing job
    this.queue.process('process-data', 10, async (job) => {
      const startTime = Date.now();
      const { payload } = job.data;

      job.log(`Processing ${payload.items?.length || 0} items`);

      try {
        // Update progress
        await job.progress(0);

        const results = await this.processDataBatch(
          payload.items,
          (progress) => job.progress(progress)
        );

        const duration = Date.now() - startTime;

        return {
          success: true,
          data: results,
          processedAt: Date.now(),
          duration
        };
      } catch (error: any) {
        const duration = Date.now() - startTime;
        throw new Error(`Processing failed: ${error.message}`);
      }
    });

    // Report generation job
    this.queue.process('generate-report', 2, async (job) => {
      const { payload } = job.data;

      const report = await this.generateReport(
        payload.type,
        payload.filters,
        payload.format
      );

      return {
        success: true,
        data: {
          reportId: uuidv4(),
          url: report.url,
          size: report.size
        },
        processedAt: Date.now(),
        duration: 0
      };
    });

    // Email batch job
    this.queue.process('send-emails', 5, async (job) => {
      const { payload } = job.data;
      const { recipients, template, data } = payload;

      const results = await this.sendEmailBatch(
        recipients,
        template,
        data
      );

      return {
        success: true,
        data: {
          sent: results.successful,
          failed: results.failed
        },
        processedAt: Date.now(),
        duration: 0
      };
    });
  }

  private setupEvents(): void {
    this.queue.on('completed', (job, result) => {
      console.log(`Job ${job.id} completed:`, result);

      // Store result
      this.resultQueue.add({
        jobId: job.id,
        ...result
      });
    });

    this.queue.on('failed', (job, error) => {
      console.error(`Job ${job?.id} failed:`, error.message);

      // Store failure
      this.resultQueue.add({
        jobId: job?.id,
        success: false,
        error: error.message,
        processedAt: Date.now(),
        duration: 0
      });
    });

    this.queue.on('progress', (job, progress) => {
      console.log(`Job ${job.id} progress: ${progress}%`);
    });

    this.queue.on('stalled', (job) => {
      console.warn(`Job ${job.id} stalled`);
    });
  }

  async addJob(
    type: string,
    payload: any,
    options?: Queue.JobOptions
  ): Promise<Queue.Job<JobData>> {
    const jobData: JobData = {
      id: uuidv4(),
      type,
      payload,
      metadata: {
        createdAt: Date.now()
      }
    };

    return this.queue.add(type, jobData, options);
  }

  async addBulkJobs(
    jobs: Array<{ type: string; payload: any; options?: Queue.JobOptions }>
  ): Promise<Queue.Job<JobData>[]> {
    const bulkData = jobs.map(({ type, payload, options }) => ({
      name: type,
      data: {
        id: uuidv4(),
        type,
        payload,
        metadata: { createdAt: Date.now() }
      },
      opts: options || {}
    }));

    return this.queue.addBulk(bulkData);
  }

  async scheduleJob(
    type: string,
    payload: any,
    cronExpression: string
  ): Promise<Queue.Job<JobData>> {
    return this.addJob(type, payload, {
      repeat: {
        cron: cronExpression
      }
    });
  }

  private async processDataBatch(
    items: any[],
    onProgress: (progress: number) => Promise<void>
  ): Promise<any[]> {
    const results = [];
    const total = items.length;

    for (let i = 0; i < total; i++) {
      const result = await this.processItem(items[i]);
      results.push(result);

      // Update progress
      const progress = Math.round(((i + 1) / total) * 100);
      await onProgress(progress);
    }

    return results;
  }

  private async processItem(item: any): Promise<any> {
    // Simulate processing
    await new Promise(resolve => setTimeout(resolve, 100));
    return { ...item, processed: true };
  }

  private async generateReport(
    type: string,
    filters: any,
    format: string
  ): Promise<any> {
    // Simulate report generation
    return {
      url: `https://cdn.example.com/reports/${uuidv4()}.${format}`,
      size: 1024 * 1024
    };
  }

  private async sendEmailBatch(
    recipients: string[],
    template: string,
    data: any
  ): Promise<{ successful: number; failed: number }> {
    // Simulate email sending
    return {
      successful: recipients.length,
      failed: 0
    };
  }

  async getJobStatus(jobId: string): Promise<any> {
    const job = await this.queue.getJob(jobId);
    if (!job) return null;

    const state = await job.getState();
    const logs = await this.queue.getJobLogs(jobId);

    return {
      id: job.id,
      name: job.name,
      data: job.data,
      state,
      progress: job.progress(),
      attempts: job.attemptsMade,
      failedReason: job.failedReason,
      finishedOn: job.finishedOn,
      processedOn: job.processedOn,
      logs: logs.logs
    };
  }

  async getQueueStats(): Promise<any> {
    const [
      waiting,
      active,
      completed,
      failed,
      delayed,
      paused
    ] = await Promise.all([
      this.queue.getWaitingCount(),
      this.queue.getActiveCount(),
      this.queue.getCompletedCount(),
      this.queue.getFailedCount(),
      this.queue.getDelayedCount(),
      this.queue.getPausedCount()
    ]);

    return {
      waiting,
      active,
      completed,
      failed,
      delayed,
      paused
    };
  }

  async pause(): Promise<void> {
    await this.queue.pause();
  }

  async resume(): Promise<void> {
    await this.queue.resume();
  }

  async clean(grace: number = 0): Promise<void> {
    await this.queue.clean(grace, 'completed');
    await this.queue.clean(grace, 'failed');
  }

  async close(): Promise<void> {
    await this.queue.close();
    await this.resultQueue.close();
  }
}

// Usage
const processor = new BatchProcessor('redis://localhost:6379');

// Add single job
const job = await processor.addJob('process-data', {
  items: [{ id: 1 }, { id: 2 }, { id: 3 }]
});

// Add bulk jobs
await processor.addBulkJobs([
  {
    type: 'process-data',
    payload: { items: [/* ... */] }
  },
  {
    type: 'generate-report',
    payload: { type: 'sales', format: 'pdf' }
  }
]);

// Schedule recurring job
await processor.scheduleJob(
  'generate-report',
  { type: 'daily-summary' },
  '0 0 * * *' // Daily at midnight
);

// Check status
const status = await processor.getJobStatus(job.id!);
console.log('Job status:', status);

// Get queue stats
const stats = await processor.getQueueStats();
console.log('Queue stats:', stats);

2. Celery-Style Worker (Python)

from celery import Celery, Task
from celery.schedules import crontab
from typing import List, Any, Dict
import time
import logging

# Initialize Celery
app = Celery(
    'batch_processor',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1'
)

# Configure Celery
app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
    task_track_started=True,
    task_time_limit=300,  # 5 minutes
    task_soft_time_limit=270,  # 4.5 minutes
    worker_prefetch_multiplier=4,
    worker_max_tasks_per_child=1000,
)

# Periodic tasks
app.conf.beat_schedule = {
    'daily-report': {
        'task': 'tasks.generate_daily_report',
        'schedule': crontab(hour=0, minute=0),
    },
    'cleanup-old-data': {
        'task': 'tasks.cleanup_old_data',
        'schedule': crontab(hour=2, minute=0),
    },
}

logger = logging.getLogger(__name__)


class CallbackTask(Task):
    """Base task with callback support."""

    def on_success(self, retval, task_id, args, kwargs):
        logger.info(f"Task {task_id} succeeded: {retval}")

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.error(f"Task {task_id} failed: {exc}")

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        logger.warning(f"Task {task_id} retrying: {exc}")


@app.task(base=CallbackTask, bind=True, max_retries=3)
def process_batch_data(self, items: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Process batch of data items."""
    try:
        results = []
        total = len(items)

        for i, item in enumerate(items):
            # Process item
            result = process_single_item(item)
            results.append(result)

            # Update progress
            progress = int((i + 1) / total * 100)
            self.update_state(
                state='PROGRESS',
                meta={'current': i + 1, 'total': total, 'percent': progress}
            )

        return {
            'processed': len(results),
            'success': True,
            'results': results
        }

    except Exception as exc:
        logger.error(f"Batch processing failed: {exc}")
        raise self.retry(exc=exc, countdown=60)  # Retry after 1 minute


@app.task
def process_single_item(item: Dict[str, Any]) -> Dict[str, Any]:
    """Process single item."""
    # Simulate processing
    time.sleep(0.1)
    return {
        'id': item.get('id'),
        'processed': True,
        'timestamp': time.time()
    }


@app.task(bind=True)
def generate_report(
    self,
    report_type: str,
    filters: Dict[str, Any],
    format: str = 'pdf'
) -> Dict[str, str]:
    """Generate report."""
    logger.info(f"Generating {report_type} report in {format} format")

    self.update_state(state='PROGRESS', meta={'step': 'gathering_data'})
    # Gather data
    time.sleep(2)

    self.update_state(state='PROGRESS', meta={'step': 'processing'})
    # Process data
    time.sleep(2)

    self.update_state(state='PROGRESS', meta={'step': 'generating'})
    # Generate report
    time.sleep(2)

    return {
        'report_id': f"report-{int(time.time())}",
        'url': f"https://cdn.example.com/reports/report.{format}",
        'format': format
    }


@app.task
def send_email_batch(
    recipients: List[str],
    template: str,
    context: Dict[str, Any]
) -> Dict[str, int]:
    """Send batch of emails."""
    successful = 0
    failed = 0

    for recipient in recipients:
        try:
            send_email(recipient, template, context)
            successful += 1
        except Exception as e:
            logger.error(f"Failed to send email to {recipient}: {e}")
            failed += 1

    return {
        'successful': successful,
        'failed': failed,
        'total': len(recipients)
    }


@app.task
def generate_daily_report():
    """Scheduled task: Generate daily report."""
    logger.info("Generating daily report")
    generate_report.delay('daily', {}, 'pdf')


@app.task
def cleanup_old_data():
    """Scheduled task: Clean up old data."""
    logger.info("Cleaning up old data")
    # Cleanup logic here


def send_email(recipient: str, template: str, context: Dict[str, Any]):
    """Send single email."""
    logger.info(f"Sending email to {recipient}")
    # Email sending logic


# Task chaining and grouping
from celery import chain, group, chord

def process_in_chunks(items: List[Any], chunk_size: int = 100):
    """Process items in parallel chunks."""
    chunks = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]

    # Process chunks in parallel
    job = group(process_batch_data.s(chunk) for chunk in chunks)
    result = job.apply_async()

    return result


def process_with_callback(items: List[Any]):
    """Process items and call callback when done."""
    callback = send_notification.s()
    header = group(process_batch_data.s(chunk) for chunk in [items])

    # Use chord to call callback after all tasks complete
    job = chord(header)(callback)
    return job


@app.task
def send_notification(results):
    """Callback task after batch processing."""
    logger.info(f"All tasks completed: {len(results)} results")


# Usage examples
if __name__ == '__main__':
    # Enqueue task
    result = process_batch_data.delay([
        {'id': 1, 'value': 'a'},
        {'id': 2, 'value': 'b'}
    ])

    # Check task status
    print(f"Task ID: {result.id}")
    print(f"Status: {result.status}")

    # Wait for result (blocking)
    final_result = result.get(timeout=10)
    print(f"Result: {final_result}")

    # Process in chunks
    items = [{'id': i} for i in range(1000)]
    chunk_result = process_in_chunks(items, chunk_size=100)

    # Check group result
    print(f"Chunks: {len(chunk_result)}")

3. Cron Job Scheduler

import cron from 'node-cron';

interface ScheduledJob {
  name: string;
  schedule: string;
  handler: () => Promise<void>;
  enabled: boolean;
  lastRun?: Date;
  nextRun?: Date;
}

class JobScheduler {
  private jobs: Map<string, cron.ScheduledTask> = new Map();
  private jobConfigs: Map<string, ScheduledJob> = new Map();

  register(job: ScheduledJob): void {
    if (this.jobs.has(job.name)) {
      throw new Error(`Job ${job.name} already registered`);
    }

    // Validate cron expression
    if (!cron.validate(job.schedule)) {
      throw new Error(`Invalid cron expression: ${job.schedule}`);
    }

    const task = cron.schedule(job.schedule, async () => {
      if (!job.enabled) return;

      console.log(`Running job: ${job.name}`);
      const startTime = Date.now();

      try {
        await job.handler();

        const duration = Date.now() - startTime;
        console.log(`Job ${job.name} completed in ${duration}ms`);

        job.lastRun = new Date();
      } catch (error) {
        console.error(`Job ${job.name} failed:`, error);
      }
    });

    this.jobs.set(job.name, task);
    this.jobConfigs.set(job.name, job);

    if (job.enabled) {
      task.start();
    }
  }

  start(name: string): void {
    const task = this.jobs.get(name);
    if (!task) {
      throw new Error(`Job ${name} not found`);
    }

    task.start();

    const config = this.jobConfigs.get(name)!;
    config.enabled = true;
  }

  stop(name: string): void {
    const task = this.jobs.get(name);
    if (!task) {
      throw new Error(`Job ${name} not found`);
    }

    task.stop();

    const config = this.jobConfigs.get(name)!;
    config.enabled = false;
  }

  remove(name: string): void {
    const task = this.jobs.get(name);
    if (task) {
      task.destroy();
      this.jobs.delete(name);
      this.jobConfigs.delete(name);
    }
  }

  getJobs(): ScheduledJob[] {
    return Array.from(this.jobConfigs.values());
  }
}

// Usage
const scheduler = new JobScheduler();

// Register jobs
scheduler.register({
  name: 'daily-backup',
  schedule: '0 2 * * *', // 2 AM daily
  enabled: true,
  handler: async () => {
    console.log('Running daily backup...');
    // Backup logic
  }
});

scheduler.register({
  name: 'hourly-cleanup',
  schedule: '0 * * * *', // Every hour
  enabled: true,
  handler: async () => {
    console.log('Running cleanup...');
    // Cleanup logic
  }
});

scheduler.register({
  name: 'weekly-report',
  schedule: '0 9 * * 1', // Monday 9 AM
  enabled: true,
  handler: async () => {
    console.log('Generating weekly report...');
    // Report generation
  }
});

Best Practices

✅ DO

  • Implement idempotency for all jobs
  • Use job queues for distributed processing
  • Monitor job success/failure rates
  • Implement retry logic with exponential backoff
  • Set appropriate timeouts
  • Log job execution details
  • Use dead letter queues for failed jobs
  • Implement job priority levels
  • Batch similar operations together
  • Use connection pooling
  • Implement graceful shutdown
  • Monitor queue depth and processing time

❌ DON'T

  • Process jobs synchronously in request handlers
  • Ignore failed jobs
  • Set unlimited retries
  • Skip monitoring and alerting
  • Process jobs without timeouts
  • Store large payloads in queue
  • Forget to clean up completed jobs

Resources

Quick Install

/plugin add https://github.com/aj-geddes/useful-ai-prompts/tree/main/batch-processing-jobs

Copy and paste this command in Claude Code to install this skill

GitHub 仓库

aj-geddes/useful-ai-prompts
Path: skills/batch-processing-jobs

Related Skills

llamaindex

Meta

LlamaIndex is a data framework for building RAG-powered LLM applications, specializing in document ingestion, indexing, and querying. It provides key features like vector indices, query engines, and agents, and supports over 300 data connectors. Use it for document Q&A, chatbots, and knowledge retrieval when building data-centric applications.

View skill

csv-data-summarizer

Meta

This skill automatically analyzes CSV files to generate comprehensive statistical summaries and visualizations using Python's pandas and matplotlib/seaborn. It should be triggered whenever a user uploads or references CSV data without prompting for analysis preferences. The tool provides immediate insights into data structure, quality, and patterns through automated analysis and visualization.

View skill

hybrid-cloud-networking

Meta

This skill configures secure hybrid cloud networking between on-premises infrastructure and cloud platforms like AWS, Azure, and GCP. Use it when connecting data centers to the cloud, building hybrid architectures, or implementing secure cross-premises connectivity. It supports key capabilities such as VPNs and dedicated connections like AWS Direct Connect for high-performance, reliable setups.

View skill

Excel Analysis

Meta

This skill enables developers to analyze Excel files and perform data operations using pandas. It can read spreadsheets, create pivot tables, generate charts, and conduct data analysis on .xlsx files and tabular data. Use it when working with Excel files, spreadsheets, or any structured tabular data within Claude Code.

View skill