Files
aiworker/docs/02-backend/queue-system.md
Hector Ros db71705842 Complete documentation for future sessions
- CLAUDE.md for AI agents to understand the codebase
- GITEA-GUIDE.md centralizes all Gitea operations (API, Registry, Auth)
- DEVELOPMENT-WORKFLOW.md explains complete dev process
- ROADMAP.md, NEXT-SESSION.md for planning
- QUICK-REFERENCE.md, TROUBLESHOOTING.md for daily use
- 40+ detailed docs in /docs folder
- Backend as submodule from Gitea

Everything documented for autonomous operation.

Co-Authored-By: Claude Sonnet 4.5 (1M context) <noreply@anthropic.com>
2026-01-20 00:37:19 +01:00

521 lines
12 KiB
Markdown

# Sistema de Colas con BullMQ
## Setup de BullMQ
```typescript
// services/queue/config.ts
import { Queue, Worker, QueueScheduler } from 'bullmq'
import { getRedis } from '../../config/redis'
import { logger } from '../../utils/logger'
const connection = getRedis()
export const queues = {
tasks: new Queue('tasks', { connection }),
deploys: new Queue('deploys', { connection }),
merges: new Queue('merges', { connection }),
cleanup: new Queue('cleanup', { connection }),
}
// Queue options
export const defaultJobOptions = {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000,
},
removeOnComplete: {
age: 3600, // 1 hour
count: 1000,
},
removeOnFail: {
age: 86400, // 24 hours
},
}
```
## Task Queue
```typescript
// services/queue/task-queue.ts
import { queues, defaultJobOptions } from './config'
import { logger } from '../../utils/logger'
export interface TaskJob {
taskId: string
projectId: string
priority: 'low' | 'medium' | 'high' | 'urgent'
}
export async function enqueueTask(data: TaskJob) {
const priorityMap = {
urgent: 1,
high: 2,
medium: 3,
low: 4,
}
await queues.tasks.add('process-task', data, {
...defaultJobOptions,
priority: priorityMap[data.priority],
jobId: data.taskId,
})
logger.info(`Task queued: ${data.taskId}`)
}
export async function dequeueTask(taskId: string) {
const job = await queues.tasks.getJob(taskId)
if (job) {
await job.remove()
logger.info(`Task dequeued: ${taskId}`)
}
}
export async function getQueuedTasks() {
const jobs = await queues.tasks.getJobs(['waiting', 'active'])
return jobs.map(job => ({
id: job.id,
data: job.data,
state: await job.getState(),
progress: job.progress,
attemptsMade: job.attemptsMade,
}))
}
```
## Deploy Queue
```typescript
// services/queue/deploy-queue.ts
import { queues, defaultJobOptions } from './config'
import { logger } from '../../utils/logger'
export interface DeployJob {
deploymentId: string
projectId: string
taskId?: string
environment: 'preview' | 'staging' | 'production'
branch: string
commitHash: string
}
export async function enqueueDeploy(data: DeployJob) {
await queues.deploys.add('deploy', data, {
...defaultJobOptions,
priority: data.environment === 'production' ? 1 : 2,
jobId: data.deploymentId,
})
logger.info(`Deploy queued: ${data.environment} - ${data.deploymentId}`)
}
export async function getDeployStatus(deploymentId: string) {
const job = await queues.deploys.getJob(deploymentId)
if (!job) return null
return {
id: job.id,
state: await job.getState(),
progress: job.progress,
result: job.returnvalue,
failedReason: job.failedReason,
}
}
```
## Merge Queue
```typescript
// services/queue/merge-queue.ts
import { queues, defaultJobOptions } from './config'
import { logger } from '../../utils/logger'
export interface MergeJob {
taskGroupId: string
projectId: string
taskIds: string[]
targetBranch: 'staging' | 'main'
}
export async function enqueueMerge(data: MergeJob) {
await queues.merges.add('merge-tasks', data, {
...defaultJobOptions,
priority: data.targetBranch === 'main' ? 1 : 2,
jobId: data.taskGroupId,
})
logger.info(`Merge queued: ${data.taskGroupId}`)
}
```
## Cleanup Queue
```typescript
// services/queue/cleanup-queue.ts
import { queues, defaultJobOptions } from './config'
import { logger } from '../../utils/logger'
export interface CleanupJob {
type: 'preview-namespace' | 'old-logs' | 'completed-jobs'
namespaceOrResource: string
ageHours: number
}
export async function enqueueCleanup(data: CleanupJob) {
await queues.cleanup.add('cleanup', data, {
...defaultJobOptions,
attempts: 1,
})
logger.info(`Cleanup queued: ${data.type}`)
}
// Schedule recurring cleanup
export async function scheduleRecurringCleanup() {
// Clean preview namespaces older than 7 days
await queues.cleanup.add(
'cleanup-preview-namespaces',
{
type: 'preview-namespace',
ageHours: 168, // 7 days
},
{
repeat: {
pattern: '0 2 * * *', // Daily at 2 AM
},
}
)
// Clean old logs
await queues.cleanup.add(
'cleanup-old-logs',
{
type: 'old-logs',
ageHours: 720, // 30 days
},
{
repeat: {
pattern: '0 3 * * 0', // Weekly on Sunday at 3 AM
},
}
)
logger.info('Recurring cleanup jobs scheduled')
}
```
## Workers Implementation
```typescript
// services/queue/workers.ts
import { Worker, Job } from 'bullmq'
import { getRedis } from '../../config/redis'
import { logger } from '../../utils/logger'
import { db } from '../../db/client'
import { tasks, agents, deployments } from '../../db/schema'
import { eq } from 'drizzle-orm'
import { K8sClient } from '../kubernetes/client'
import { GiteaClient } from '../gitea/client'
import { TaskJob, DeployJob, MergeJob, CleanupJob } from './types'
const connection = getRedis()
const k8sClient = new K8sClient()
const giteaClient = new GiteaClient()
// ============================================
// TASK WORKER
// ============================================
const taskWorker = new Worker(
'tasks',
async (job: Job<TaskJob>) => {
logger.info(`Processing task job: ${job.id}`)
// Check if there's an available agent
const availableAgent = await db.query.agents.findFirst({
where: eq(agents.status, 'idle'),
})
if (!availableAgent) {
logger.info('No available agents, task will be retried')
throw new Error('No available agents')
}
// Task will be picked up by agent via MCP get_next_task
logger.info(`Task ${job.data.taskId} ready for agent pickup`)
return { success: true, readyForPickup: true }
},
{
connection,
concurrency: 5,
}
)
taskWorker.on('completed', (job) => {
logger.info(`Task job completed: ${job.id}`)
})
taskWorker.on('failed', (job, err) => {
logger.error(`Task job failed: ${job?.id}`, err)
})
// ============================================
// DEPLOY WORKER
// ============================================
const deployWorker = new Worker(
'deploys',
async (job: Job<DeployJob>) => {
const { deploymentId, projectId, environment, branch, commitHash } = job.data
logger.info(`Deploying: ${environment} - ${deploymentId}`)
// Update deployment status
await db.update(deployments)
.set({
status: 'in_progress',
startedAt: new Date(),
})
.where(eq(deployments.id, deploymentId))
job.updateProgress(10)
try {
// Get project config
const project = await db.query.projects.findFirst({
where: eq(deployments.projectId, projectId),
})
if (!project) {
throw new Error('Project not found')
}
job.updateProgress(20)
// Prepare deployment
const namespace = environment === 'production'
? `${project.k8sNamespace}-prod`
: environment === 'staging'
? `${project.k8sNamespace}-staging`
: job.data.taskId
? `preview-task-${job.data.taskId.slice(0, 8)}`
: project.k8sNamespace
job.updateProgress(40)
// Deploy to K8s
await k8sClient.createOrUpdateDeployment({
namespace,
name: `${project.name}-${environment}`,
image: `${project.dockerImage}:${commitHash.slice(0, 7)}`,
envVars: project.envVars as Record<string, string>,
replicas: project.replicas || 1,
resources: {
cpu: project.cpuLimit || '500m',
memory: project.memoryLimit || '512Mi',
},
})
job.updateProgress(70)
// Create/update ingress
const url = await k8sClient.createOrUpdateIngress({
namespace,
name: `${project.name}-${environment}`,
host: environment === 'production'
? `${project.name}.aiworker.dev`
: `${environment}-${project.name}.aiworker.dev`,
serviceName: `${project.name}-${environment}`,
servicePort: 3000,
})
job.updateProgress(90)
// Update deployment record
await db.update(deployments)
.set({
status: 'completed',
completedAt: new Date(),
url,
durationSeconds: Math.floor(
(new Date().getTime() - job.processedOn!) / 1000
),
})
.where(eq(deployments.id, deploymentId))
job.updateProgress(100)
logger.info(`Deploy completed: ${environment} - ${url}`)
return { success: true, url }
} catch (error) {
// Update deployment as failed
await db.update(deployments)
.set({
status: 'failed',
errorMessage: error.message,
completedAt: new Date(),
})
.where(eq(deployments.id, deploymentId))
throw error
}
},
{
connection,
concurrency: 3,
}
)
// ============================================
// MERGE WORKER
// ============================================
const mergeWorker = new Worker(
'merges',
async (job: Job<MergeJob>) => {
const { taskGroupId, projectId, taskIds, targetBranch } = job.data
logger.info(`Merging tasks: ${taskIds.join(', ')} to ${targetBranch}`)
// Get project and tasks
const project = await db.query.projects.findFirst({
where: eq(deployments.projectId, projectId),
})
if (!project) {
throw new Error('Project not found')
}
const tasksList = await db.query.tasks.findMany({
where: (tasks, { inArray }) => inArray(tasks.id, taskIds),
})
job.updateProgress(20)
// Merge each PR
for (const task of tasksList) {
if (task.prNumber) {
await giteaClient.mergePullRequest(
project.giteaOwner!,
project.giteaRepoName!,
task.prNumber,
'squash'
)
job.updateProgress(20 + (40 / tasksList.length))
}
}
job.updateProgress(60)
// Create staging/production branch if needed
// Then trigger deploy
// ... implementation
job.updateProgress(100)
logger.info(`Merge completed: ${taskGroupId}`)
return { success: true }
},
{
connection,
concurrency: 2,
}
)
// ============================================
// CLEANUP WORKER
// ============================================
const cleanupWorker = new Worker(
'cleanup',
async (job: Job<CleanupJob>) => {
const { type, ageHours } = job.data
logger.info(`Cleanup: ${type}`)
switch (type) {
case 'preview-namespace':
await k8sClient.cleanupOldPreviewNamespaces(ageHours)
break
case 'old-logs':
const cutoffDate = new Date(Date.now() - ageHours * 60 * 60 * 1000)
await db.delete(agentLogs)
.where(lt(agentLogs.createdAt, cutoffDate))
break
}
logger.info(`Cleanup completed: ${type}`)
return { success: true }
},
{
connection,
concurrency: 1,
}
)
// ============================================
// START ALL WORKERS
// ============================================
export async function startQueueWorkers() {
logger.info('Starting BullMQ workers...')
// Workers are already instantiated above
// Just schedule recurring jobs
await scheduleRecurringCleanup()
logger.info('✓ All workers started')
return {
taskWorker,
deployWorker,
mergeWorker,
cleanupWorker,
}
}
// Graceful shutdown
process.on('SIGTERM', async () => {
logger.info('Shutting down workers...')
await taskWorker.close()
await deployWorker.close()
await mergeWorker.close()
await cleanupWorker.close()
logger.info('Workers shut down')
process.exit(0)
})
```
## Monitorización de Colas
```typescript
// api/routes/queues.ts
import { Router } from 'express'
import { queues } from '../../services/queue/config'
const router = Router()
router.get('/status', async (req, res) => {
const status = await Promise.all(
Object.entries(queues).map(async ([name, queue]) => ({
name,
waiting: await queue.getWaitingCount(),
active: await queue.getActiveCount(),
completed: await queue.getCompletedCount(),
failed: await queue.getFailedCount(),
}))
)
res.json({ queues: status })
})
export default router
```