From f104425b915a1a5690ace8adb70e417473978b5a Mon Sep 17 00:00:00 2001 From: Hector Ros Date: Tue, 20 Jan 2026 17:34:10 +0100 Subject: [PATCH] Implement Kubernetes pod management for agents - Add @kubernetes/client-node dependency - Create K8s client utilities in src/lib/k8s.ts - Implement createAgentPod and deleteAgentPod functions - Update launchAgent to actually create pods in K8s - Update unregisterAgent to delete pods from K8s - Initialize K8s client on backend startup - Add rollback logic if pod creation fails Co-Authored-By: Claude Sonnet 4.5 (1M context) --- src/api/routes/agents.ts | 33 ++++--- src/index.ts | 10 ++ src/lib/k8s.ts | 199 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 231 insertions(+), 11 deletions(-) create mode 100644 src/lib/k8s.ts diff --git a/src/api/routes/agents.ts b/src/api/routes/agents.ts index 1b87974..5efc372 100644 --- a/src/api/routes/agents.ts +++ b/src/api/routes/agents.ts @@ -8,6 +8,7 @@ import { agents, tasks } from '../../db/schema' import { eq, and } from 'drizzle-orm' import { randomUUID } from 'crypto' import { authenticateRequest } from '../middleware/auth' +import { createAgentPod, deleteAgentPod } from '../../lib/k8s' /** * Handle all agent routes @@ -331,14 +332,20 @@ async function unregisterAgent(agentId: string, userId: string): Promise { /** * Launch new agent (create pod dynamically) - * TODO: Integrate with K8s API to create pod */ async function launchAgent(userId: string, req: Request): Promise { try { const agentId = randomUUID() const podName = `claude-agent-${userId.slice(0, 8)}-${Date.now()}` - // Create agent record in DB + // Create agent record in DB first const newAgent = { id: agentId, userId, @@ -396,20 +402,25 @@ async function launchAgent(userId: string, req: Request): Promise { await db.insert(agents).values(newAgent) - // TODO: Create K8s pod using K8s API - // For now, just return the agent record - // In production, this would call kubectl or use @kubernetes/client-node + // Create K8s pod + try { + await createAgentPod(podName, userId) + } catch (k8sError: any) { + // If pod creation fails, rollback DB entry + await db.delete(agents).where(eq(agents.id, agentId)) + throw new Error(`Failed to create pod: ${k8sError.message}`) + } return Response.json({ success: true, data: newAgent, - message: 'Agent launch initiated. Pod will be created shortly.', + message: 'Agent launched successfully. Pod is starting...', }, { status: 201 }) - } catch (error) { + } catch (error: any) { console.error('Error launching agent:', error) return Response.json({ success: false, - error: 'Failed to launch agent', + error: error.message || 'Failed to launch agent', }, { status: 500 }) } } diff --git a/src/index.ts b/src/index.ts index b7adc5b..a510eb5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -9,6 +9,7 @@ import { handleAuthRoutes, handleProjectRoutes, handleTaskRoutes, handleAgentRou import { authenticateRequest } from './api/middleware/auth' import { agents } from './db/schema' import { eq, and } from 'drizzle-orm' +import { initK8sClient } from './lib/k8s' console.log('🚀 Starting AiWorker Backend...') console.log(`Bun version: ${Bun.version}`) @@ -20,6 +21,15 @@ await runMigrations() // Test database connection await testConnection() +// Initialize Kubernetes client +try { + initK8sClient() + console.log('✅ Kubernetes client initialized') +} catch (error) { + console.error('⚠️ Failed to initialize Kubernetes client:', error) + console.log('⚠️ Agent pod management will not be available') +} + const PORT = process.env.PORT || 3000 // Health check route diff --git a/src/lib/k8s.ts b/src/lib/k8s.ts new file mode 100644 index 0000000..e262d2d --- /dev/null +++ b/src/lib/k8s.ts @@ -0,0 +1,199 @@ +/** + * Kubernetes API client utilities + */ + +import * as k8s from '@kubernetes/client-node' + +let k8sClient: k8s.CoreV1Api | null = null +let k8sConfig: k8s.KubeConfig | null = null + +/** + * Initialize Kubernetes client + */ +export function initK8sClient() { + if (k8sClient) return k8sClient + + k8sConfig = new k8s.KubeConfig() + + // Check if running in cluster + const inCluster = process.env.K8S_IN_CLUSTER === 'true' + + if (inCluster) { + k8sConfig.loadFromCluster() + } else { + // Load from kubeconfig file + const configPath = process.env.K8S_CONFIG_PATH || process.env.KUBECONFIG || '~/.kube/config' + k8sConfig.loadFromFile(configPath) + } + + k8sClient = k8sConfig.makeApiClient(k8s.CoreV1Api) + return k8sClient +} + +/** + * Get Kubernetes client + */ +export function getK8sClient(): k8s.CoreV1Api { + if (!k8sClient) { + return initK8sClient() + } + return k8sClient +} + +/** + * Create pod spec for agent + */ +export function createAgentPodSpec(podName: string, userId: string) { + return { + apiVersion: 'v1', + kind: 'Pod', + metadata: { + name: podName, + namespace: 'agents', + labels: { + app: 'claude-agent', + userId: userId, + 'aiworker.io/agent': 'true', + }, + }, + spec: { + serviceAccountName: 'agent-sa', + imagePullSecrets: [ + { + name: 'gitea-registry', + }, + ], + containers: [ + { + name: 'agent', + image: 'git.fuq.tv/admin/aiworker-agent:latest', + imagePullPolicy: 'Always', + ports: [ + { + containerPort: 7681, + name: 'terminal', + }, + ], + env: [ + { + name: 'BACKEND_URL', + value: 'https://api.fuq.tv', + }, + { + name: 'MCP_ENDPOINT', + value: 'https://api.fuq.tv/api/mcp', + }, + { + name: 'GITEA_URL', + value: 'https://git.fuq.tv', + }, + { + name: 'GITEA_TOKEN', + valueFrom: { + secretKeyRef: { + name: 'agent-secrets', + key: 'gitea-token', + }, + }, + }, + { + name: 'POD_NAME', + valueFrom: { + fieldRef: { + fieldPath: 'metadata.name', + }, + }, + }, + { + name: 'NAMESPACE', + valueFrom: { + fieldRef: { + fieldPath: 'metadata.namespace', + }, + }, + }, + { + name: 'USER_ID', + value: userId, + }, + ], + resources: { + requests: { + cpu: '500m', + memory: '1Gi', + }, + limits: { + cpu: '2000m', + memory: '4Gi', + }, + }, + volumeMounts: [ + { + name: 'workspace', + mountPath: '/workspace', + }, + ], + }, + ], + volumes: [ + { + name: 'workspace', + emptyDir: {}, + }, + ], + }, + } +} + +/** + * Create agent pod in Kubernetes + */ +export async function createAgentPod(podName: string, userId: string): Promise { + const client = getK8sClient() + const podSpec = createAgentPodSpec(podName, userId) + + try { + await client.createNamespacedPod('agents', podSpec) + console.log(`✅ Pod ${podName} created successfully`) + } catch (error: any) { + console.error(`❌ Failed to create pod ${podName}:`, error.message) + throw error + } +} + +/** + * Delete agent pod from Kubernetes + */ +export async function deleteAgentPod(podName: string): Promise { + const client = getK8sClient() + + try { + await client.deleteNamespacedPod(podName, 'agents') + console.log(`✅ Pod ${podName} deleted successfully`) + } catch (error: any) { + // Ignore 404 errors (pod already deleted) + if (error.statusCode === 404) { + console.log(`⚠️ Pod ${podName} not found (already deleted)`) + return + } + console.error(`❌ Failed to delete pod ${podName}:`, error.message) + throw error + } +} + +/** + * Get pod status + */ +export async function getPodStatus(podName: string): Promise { + const client = getK8sClient() + + try { + const response = await client.readNamespacedPod(podName, 'agents') + return response.body.status?.phase || null + } catch (error: any) { + if (error.statusCode === 404) { + return null + } + throw error + } +}