Files
aiworker/docs/02-backend/queue-system.md
Hector Ros db71705842 Complete documentation for future sessions
- CLAUDE.md for AI agents to understand the codebase
- GITEA-GUIDE.md centralizes all Gitea operations (API, Registry, Auth)
- DEVELOPMENT-WORKFLOW.md explains complete dev process
- ROADMAP.md, NEXT-SESSION.md for planning
- QUICK-REFERENCE.md, TROUBLESHOOTING.md for daily use
- 40+ detailed docs in /docs folder
- Backend as submodule from Gitea

Everything documented for autonomous operation.

Co-Authored-By: Claude Sonnet 4.5 (1M context) <noreply@anthropic.com>
2026-01-20 00:37:19 +01:00

12 KiB

Sistema de Colas con BullMQ

Setup de BullMQ

// services/queue/config.ts
import { Queue, Worker, QueueScheduler } from 'bullmq'
import { getRedis } from '../../config/redis'
import { logger } from '../../utils/logger'

const connection = getRedis()

export const queues = {
  tasks: new Queue('tasks', { connection }),
  deploys: new Queue('deploys', { connection }),
  merges: new Queue('merges', { connection }),
  cleanup: new Queue('cleanup', { connection }),
}

// Queue options
export const defaultJobOptions = {
  attempts: 3,
  backoff: {
    type: 'exponential',
    delay: 2000,
  },
  removeOnComplete: {
    age: 3600, // 1 hour
    count: 1000,
  },
  removeOnFail: {
    age: 86400, // 24 hours
  },
}

Task Queue

// services/queue/task-queue.ts
import { queues, defaultJobOptions } from './config'
import { logger } from '../../utils/logger'

export interface TaskJob {
  taskId: string
  projectId: string
  priority: 'low' | 'medium' | 'high' | 'urgent'
}

export async function enqueueTask(data: TaskJob) {
  const priorityMap = {
    urgent: 1,
    high: 2,
    medium: 3,
    low: 4,
  }

  await queues.tasks.add('process-task', data, {
    ...defaultJobOptions,
    priority: priorityMap[data.priority],
    jobId: data.taskId,
  })

  logger.info(`Task queued: ${data.taskId}`)
}

export async function dequeueTask(taskId: string) {
  const job = await queues.tasks.getJob(taskId)
  if (job) {
    await job.remove()
    logger.info(`Task dequeued: ${taskId}`)
  }
}

export async function getQueuedTasks() {
  const jobs = await queues.tasks.getJobs(['waiting', 'active'])
  return jobs.map(job => ({
    id: job.id,
    data: job.data,
    state: await job.getState(),
    progress: job.progress,
    attemptsMade: job.attemptsMade,
  }))
}

Deploy Queue

// services/queue/deploy-queue.ts
import { queues, defaultJobOptions } from './config'
import { logger } from '../../utils/logger'

export interface DeployJob {
  deploymentId: string
  projectId: string
  taskId?: string
  environment: 'preview' | 'staging' | 'production'
  branch: string
  commitHash: string
}

export async function enqueueDeploy(data: DeployJob) {
  await queues.deploys.add('deploy', data, {
    ...defaultJobOptions,
    priority: data.environment === 'production' ? 1 : 2,
    jobId: data.deploymentId,
  })

  logger.info(`Deploy queued: ${data.environment} - ${data.deploymentId}`)
}

export async function getDeployStatus(deploymentId: string) {
  const job = await queues.deploys.getJob(deploymentId)
  if (!job) return null

  return {
    id: job.id,
    state: await job.getState(),
    progress: job.progress,
    result: job.returnvalue,
    failedReason: job.failedReason,
  }
}

Merge Queue

// services/queue/merge-queue.ts
import { queues, defaultJobOptions } from './config'
import { logger } from '../../utils/logger'

export interface MergeJob {
  taskGroupId: string
  projectId: string
  taskIds: string[]
  targetBranch: 'staging' | 'main'
}

export async function enqueueMerge(data: MergeJob) {
  await queues.merges.add('merge-tasks', data, {
    ...defaultJobOptions,
    priority: data.targetBranch === 'main' ? 1 : 2,
    jobId: data.taskGroupId,
  })

  logger.info(`Merge queued: ${data.taskGroupId}`)
}

Cleanup Queue

// services/queue/cleanup-queue.ts
import { queues, defaultJobOptions } from './config'
import { logger } from '../../utils/logger'

export interface CleanupJob {
  type: 'preview-namespace' | 'old-logs' | 'completed-jobs'
  namespaceOrResource: string
  ageHours: number
}

export async function enqueueCleanup(data: CleanupJob) {
  await queues.cleanup.add('cleanup', data, {
    ...defaultJobOptions,
    attempts: 1,
  })

  logger.info(`Cleanup queued: ${data.type}`)
}

// Schedule recurring cleanup
export async function scheduleRecurringCleanup() {
  // Clean preview namespaces older than 7 days
  await queues.cleanup.add(
    'cleanup-preview-namespaces',
    {
      type: 'preview-namespace',
      ageHours: 168, // 7 days
    },
    {
      repeat: {
        pattern: '0 2 * * *', // Daily at 2 AM
      },
    }
  )

  // Clean old logs
  await queues.cleanup.add(
    'cleanup-old-logs',
    {
      type: 'old-logs',
      ageHours: 720, // 30 days
    },
    {
      repeat: {
        pattern: '0 3 * * 0', // Weekly on Sunday at 3 AM
      },
    }
  )

  logger.info('Recurring cleanup jobs scheduled')
}

Workers Implementation

// services/queue/workers.ts
import { Worker, Job } from 'bullmq'
import { getRedis } from '../../config/redis'
import { logger } from '../../utils/logger'
import { db } from '../../db/client'
import { tasks, agents, deployments } from '../../db/schema'
import { eq } from 'drizzle-orm'
import { K8sClient } from '../kubernetes/client'
import { GiteaClient } from '../gitea/client'
import { TaskJob, DeployJob, MergeJob, CleanupJob } from './types'

const connection = getRedis()
const k8sClient = new K8sClient()
const giteaClient = new GiteaClient()

// ============================================
// TASK WORKER
// ============================================

const taskWorker = new Worker(
  'tasks',
  async (job: Job<TaskJob>) => {
    logger.info(`Processing task job: ${job.id}`)

    // Check if there's an available agent
    const availableAgent = await db.query.agents.findFirst({
      where: eq(agents.status, 'idle'),
    })

    if (!availableAgent) {
      logger.info('No available agents, task will be retried')
      throw new Error('No available agents')
    }

    // Task will be picked up by agent via MCP get_next_task
    logger.info(`Task ${job.data.taskId} ready for agent pickup`)

    return { success: true, readyForPickup: true }
  },
  {
    connection,
    concurrency: 5,
  }
)

taskWorker.on('completed', (job) => {
  logger.info(`Task job completed: ${job.id}`)
})

taskWorker.on('failed', (job, err) => {
  logger.error(`Task job failed: ${job?.id}`, err)
})

// ============================================
// DEPLOY WORKER
// ============================================

const deployWorker = new Worker(
  'deploys',
  async (job: Job<DeployJob>) => {
    const { deploymentId, projectId, environment, branch, commitHash } = job.data

    logger.info(`Deploying: ${environment} - ${deploymentId}`)

    // Update deployment status
    await db.update(deployments)
      .set({
        status: 'in_progress',
        startedAt: new Date(),
      })
      .where(eq(deployments.id, deploymentId))

    job.updateProgress(10)

    try {
      // Get project config
      const project = await db.query.projects.findFirst({
        where: eq(deployments.projectId, projectId),
      })

      if (!project) {
        throw new Error('Project not found')
      }

      job.updateProgress(20)

      // Prepare deployment
      const namespace = environment === 'production'
        ? `${project.k8sNamespace}-prod`
        : environment === 'staging'
        ? `${project.k8sNamespace}-staging`
        : job.data.taskId
        ? `preview-task-${job.data.taskId.slice(0, 8)}`
        : project.k8sNamespace

      job.updateProgress(40)

      // Deploy to K8s
      await k8sClient.createOrUpdateDeployment({
        namespace,
        name: `${project.name}-${environment}`,
        image: `${project.dockerImage}:${commitHash.slice(0, 7)}`,
        envVars: project.envVars as Record<string, string>,
        replicas: project.replicas || 1,
        resources: {
          cpu: project.cpuLimit || '500m',
          memory: project.memoryLimit || '512Mi',
        },
      })

      job.updateProgress(70)

      // Create/update ingress
      const url = await k8sClient.createOrUpdateIngress({
        namespace,
        name: `${project.name}-${environment}`,
        host: environment === 'production'
          ? `${project.name}.aiworker.dev`
          : `${environment}-${project.name}.aiworker.dev`,
        serviceName: `${project.name}-${environment}`,
        servicePort: 3000,
      })

      job.updateProgress(90)

      // Update deployment record
      await db.update(deployments)
        .set({
          status: 'completed',
          completedAt: new Date(),
          url,
          durationSeconds: Math.floor(
            (new Date().getTime() - job.processedOn!) / 1000
          ),
        })
        .where(eq(deployments.id, deploymentId))

      job.updateProgress(100)

      logger.info(`Deploy completed: ${environment} - ${url}`)

      return { success: true, url }
    } catch (error) {
      // Update deployment as failed
      await db.update(deployments)
        .set({
          status: 'failed',
          errorMessage: error.message,
          completedAt: new Date(),
        })
        .where(eq(deployments.id, deploymentId))

      throw error
    }
  },
  {
    connection,
    concurrency: 3,
  }
)

// ============================================
// MERGE WORKER
// ============================================

const mergeWorker = new Worker(
  'merges',
  async (job: Job<MergeJob>) => {
    const { taskGroupId, projectId, taskIds, targetBranch } = job.data

    logger.info(`Merging tasks: ${taskIds.join(', ')} to ${targetBranch}`)

    // Get project and tasks
    const project = await db.query.projects.findFirst({
      where: eq(deployments.projectId, projectId),
    })

    if (!project) {
      throw new Error('Project not found')
    }

    const tasksList = await db.query.tasks.findMany({
      where: (tasks, { inArray }) => inArray(tasks.id, taskIds),
    })

    job.updateProgress(20)

    // Merge each PR
    for (const task of tasksList) {
      if (task.prNumber) {
        await giteaClient.mergePullRequest(
          project.giteaOwner!,
          project.giteaRepoName!,
          task.prNumber,
          'squash'
        )

        job.updateProgress(20 + (40 / tasksList.length))
      }
    }

    job.updateProgress(60)

    // Create staging/production branch if needed
    // Then trigger deploy
    // ... implementation

    job.updateProgress(100)

    logger.info(`Merge completed: ${taskGroupId}`)

    return { success: true }
  },
  {
    connection,
    concurrency: 2,
  }
)

// ============================================
// CLEANUP WORKER
// ============================================

const cleanupWorker = new Worker(
  'cleanup',
  async (job: Job<CleanupJob>) => {
    const { type, ageHours } = job.data

    logger.info(`Cleanup: ${type}`)

    switch (type) {
      case 'preview-namespace':
        await k8sClient.cleanupOldPreviewNamespaces(ageHours)
        break

      case 'old-logs':
        const cutoffDate = new Date(Date.now() - ageHours * 60 * 60 * 1000)
        await db.delete(agentLogs)
          .where(lt(agentLogs.createdAt, cutoffDate))
        break
    }

    logger.info(`Cleanup completed: ${type}`)

    return { success: true }
  },
  {
    connection,
    concurrency: 1,
  }
)

// ============================================
// START ALL WORKERS
// ============================================

export async function startQueueWorkers() {
  logger.info('Starting BullMQ workers...')

  // Workers are already instantiated above
  // Just schedule recurring jobs
  await scheduleRecurringCleanup()

  logger.info('✓ All workers started')

  return {
    taskWorker,
    deployWorker,
    mergeWorker,
    cleanupWorker,
  }
}

// Graceful shutdown
process.on('SIGTERM', async () => {
  logger.info('Shutting down workers...')
  await taskWorker.close()
  await deployWorker.close()
  await mergeWorker.close()
  await cleanupWorker.close()
  logger.info('Workers shut down')
  process.exit(0)
})

Monitorización de Colas

// api/routes/queues.ts
import { Router } from 'express'
import { queues } from '../../services/queue/config'

const router = Router()

router.get('/status', async (req, res) => {
  const status = await Promise.all(
    Object.entries(queues).map(async ([name, queue]) => ({
      name,
      waiting: await queue.getWaitingCount(),
      active: await queue.getActiveCount(),
      completed: await queue.getCompletedCount(),
      failed: await queue.getFailedCount(),
    }))
  )

  res.json({ queues: status })
})

export default router