Implement Kubernetes pod management for agents
All checks were successful
Build and Push Backend / build (push) Successful in 5s
All checks were successful
Build and Push Backend / build (push) Successful in 5s
- Add @kubernetes/client-node dependency - Create K8s client utilities in src/lib/k8s.ts - Implement createAgentPod and deleteAgentPod functions - Update launchAgent to actually create pods in K8s - Update unregisterAgent to delete pods from K8s - Initialize K8s client on backend startup - Add rollback logic if pod creation fails Co-Authored-By: Claude Sonnet 4.5 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -8,6 +8,7 @@ import { agents, tasks } from '../../db/schema'
|
|||||||
import { eq, and } from 'drizzle-orm'
|
import { eq, and } from 'drizzle-orm'
|
||||||
import { randomUUID } from 'crypto'
|
import { randomUUID } from 'crypto'
|
||||||
import { authenticateRequest } from '../middleware/auth'
|
import { authenticateRequest } from '../middleware/auth'
|
||||||
|
import { createAgentPod, deleteAgentPod } from '../../lib/k8s'
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handle all agent routes
|
* Handle all agent routes
|
||||||
@@ -331,14 +332,20 @@ async function unregisterAgent(agentId: string, userId: string): Promise<Respons
|
|||||||
.where(eq(tasks.id, existing[0].currentTaskId))
|
.where(eq(tasks.id, existing[0].currentTaskId))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Delete K8s pod
|
||||||
|
try {
|
||||||
|
await deleteAgentPod(existing[0].podName)
|
||||||
|
} catch (k8sError) {
|
||||||
|
console.error('Failed to delete pod, continuing...', k8sError)
|
||||||
|
// Continue even if pod deletion fails
|
||||||
|
}
|
||||||
|
|
||||||
// Delete agent from DB
|
// Delete agent from DB
|
||||||
await db.delete(agents).where(eq(agents.id, agentId))
|
await db.delete(agents).where(eq(agents.id, agentId))
|
||||||
|
|
||||||
// TODO: Delete K8s pod if it exists
|
|
||||||
|
|
||||||
return Response.json({
|
return Response.json({
|
||||||
success: true,
|
success: true,
|
||||||
message: 'Agent unregistered',
|
message: 'Agent deleted successfully',
|
||||||
})
|
})
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error unregistering agent:', error)
|
console.error('Error unregistering agent:', error)
|
||||||
@@ -375,14 +382,13 @@ async function listMyAgents(userId: string): Promise<Response> {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Launch new agent (create pod dynamically)
|
* Launch new agent (create pod dynamically)
|
||||||
* TODO: Integrate with K8s API to create pod
|
|
||||||
*/
|
*/
|
||||||
async function launchAgent(userId: string, req: Request): Promise<Response> {
|
async function launchAgent(userId: string, req: Request): Promise<Response> {
|
||||||
try {
|
try {
|
||||||
const agentId = randomUUID()
|
const agentId = randomUUID()
|
||||||
const podName = `claude-agent-${userId.slice(0, 8)}-${Date.now()}`
|
const podName = `claude-agent-${userId.slice(0, 8)}-${Date.now()}`
|
||||||
|
|
||||||
// Create agent record in DB
|
// Create agent record in DB first
|
||||||
const newAgent = {
|
const newAgent = {
|
||||||
id: agentId,
|
id: agentId,
|
||||||
userId,
|
userId,
|
||||||
@@ -396,20 +402,25 @@ async function launchAgent(userId: string, req: Request): Promise<Response> {
|
|||||||
|
|
||||||
await db.insert(agents).values(newAgent)
|
await db.insert(agents).values(newAgent)
|
||||||
|
|
||||||
// TODO: Create K8s pod using K8s API
|
// Create K8s pod
|
||||||
// For now, just return the agent record
|
try {
|
||||||
// In production, this would call kubectl or use @kubernetes/client-node
|
await createAgentPod(podName, userId)
|
||||||
|
} catch (k8sError: any) {
|
||||||
|
// If pod creation fails, rollback DB entry
|
||||||
|
await db.delete(agents).where(eq(agents.id, agentId))
|
||||||
|
throw new Error(`Failed to create pod: ${k8sError.message}`)
|
||||||
|
}
|
||||||
|
|
||||||
return Response.json({
|
return Response.json({
|
||||||
success: true,
|
success: true,
|
||||||
data: newAgent,
|
data: newAgent,
|
||||||
message: 'Agent launch initiated. Pod will be created shortly.',
|
message: 'Agent launched successfully. Pod is starting...',
|
||||||
}, { status: 201 })
|
}, { status: 201 })
|
||||||
} catch (error) {
|
} catch (error: any) {
|
||||||
console.error('Error launching agent:', error)
|
console.error('Error launching agent:', error)
|
||||||
return Response.json({
|
return Response.json({
|
||||||
success: false,
|
success: false,
|
||||||
error: 'Failed to launch agent',
|
error: error.message || 'Failed to launch agent',
|
||||||
}, { status: 500 })
|
}, { status: 500 })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
10
src/index.ts
10
src/index.ts
@@ -9,6 +9,7 @@ import { handleAuthRoutes, handleProjectRoutes, handleTaskRoutes, handleAgentRou
|
|||||||
import { authenticateRequest } from './api/middleware/auth'
|
import { authenticateRequest } from './api/middleware/auth'
|
||||||
import { agents } from './db/schema'
|
import { agents } from './db/schema'
|
||||||
import { eq, and } from 'drizzle-orm'
|
import { eq, and } from 'drizzle-orm'
|
||||||
|
import { initK8sClient } from './lib/k8s'
|
||||||
|
|
||||||
console.log('🚀 Starting AiWorker Backend...')
|
console.log('🚀 Starting AiWorker Backend...')
|
||||||
console.log(`Bun version: ${Bun.version}`)
|
console.log(`Bun version: ${Bun.version}`)
|
||||||
@@ -20,6 +21,15 @@ await runMigrations()
|
|||||||
// Test database connection
|
// Test database connection
|
||||||
await testConnection()
|
await testConnection()
|
||||||
|
|
||||||
|
// Initialize Kubernetes client
|
||||||
|
try {
|
||||||
|
initK8sClient()
|
||||||
|
console.log('✅ Kubernetes client initialized')
|
||||||
|
} catch (error) {
|
||||||
|
console.error('⚠️ Failed to initialize Kubernetes client:', error)
|
||||||
|
console.log('⚠️ Agent pod management will not be available')
|
||||||
|
}
|
||||||
|
|
||||||
const PORT = process.env.PORT || 3000
|
const PORT = process.env.PORT || 3000
|
||||||
|
|
||||||
// Health check route
|
// Health check route
|
||||||
|
|||||||
199
src/lib/k8s.ts
Normal file
199
src/lib/k8s.ts
Normal file
@@ -0,0 +1,199 @@
|
|||||||
|
/**
|
||||||
|
* Kubernetes API client utilities
|
||||||
|
*/
|
||||||
|
|
||||||
|
import * as k8s from '@kubernetes/client-node'
|
||||||
|
|
||||||
|
let k8sClient: k8s.CoreV1Api | null = null
|
||||||
|
let k8sConfig: k8s.KubeConfig | null = null
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize Kubernetes client
|
||||||
|
*/
|
||||||
|
export function initK8sClient() {
|
||||||
|
if (k8sClient) return k8sClient
|
||||||
|
|
||||||
|
k8sConfig = new k8s.KubeConfig()
|
||||||
|
|
||||||
|
// Check if running in cluster
|
||||||
|
const inCluster = process.env.K8S_IN_CLUSTER === 'true'
|
||||||
|
|
||||||
|
if (inCluster) {
|
||||||
|
k8sConfig.loadFromCluster()
|
||||||
|
} else {
|
||||||
|
// Load from kubeconfig file
|
||||||
|
const configPath = process.env.K8S_CONFIG_PATH || process.env.KUBECONFIG || '~/.kube/config'
|
||||||
|
k8sConfig.loadFromFile(configPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
k8sClient = k8sConfig.makeApiClient(k8s.CoreV1Api)
|
||||||
|
return k8sClient
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get Kubernetes client
|
||||||
|
*/
|
||||||
|
export function getK8sClient(): k8s.CoreV1Api {
|
||||||
|
if (!k8sClient) {
|
||||||
|
return initK8sClient()
|
||||||
|
}
|
||||||
|
return k8sClient
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create pod spec for agent
|
||||||
|
*/
|
||||||
|
export function createAgentPodSpec(podName: string, userId: string) {
|
||||||
|
return {
|
||||||
|
apiVersion: 'v1',
|
||||||
|
kind: 'Pod',
|
||||||
|
metadata: {
|
||||||
|
name: podName,
|
||||||
|
namespace: 'agents',
|
||||||
|
labels: {
|
||||||
|
app: 'claude-agent',
|
||||||
|
userId: userId,
|
||||||
|
'aiworker.io/agent': 'true',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
spec: {
|
||||||
|
serviceAccountName: 'agent-sa',
|
||||||
|
imagePullSecrets: [
|
||||||
|
{
|
||||||
|
name: 'gitea-registry',
|
||||||
|
},
|
||||||
|
],
|
||||||
|
containers: [
|
||||||
|
{
|
||||||
|
name: 'agent',
|
||||||
|
image: 'git.fuq.tv/admin/aiworker-agent:latest',
|
||||||
|
imagePullPolicy: 'Always',
|
||||||
|
ports: [
|
||||||
|
{
|
||||||
|
containerPort: 7681,
|
||||||
|
name: 'terminal',
|
||||||
|
},
|
||||||
|
],
|
||||||
|
env: [
|
||||||
|
{
|
||||||
|
name: 'BACKEND_URL',
|
||||||
|
value: 'https://api.fuq.tv',
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: 'MCP_ENDPOINT',
|
||||||
|
value: 'https://api.fuq.tv/api/mcp',
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: 'GITEA_URL',
|
||||||
|
value: 'https://git.fuq.tv',
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: 'GITEA_TOKEN',
|
||||||
|
valueFrom: {
|
||||||
|
secretKeyRef: {
|
||||||
|
name: 'agent-secrets',
|
||||||
|
key: 'gitea-token',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: 'POD_NAME',
|
||||||
|
valueFrom: {
|
||||||
|
fieldRef: {
|
||||||
|
fieldPath: 'metadata.name',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: 'NAMESPACE',
|
||||||
|
valueFrom: {
|
||||||
|
fieldRef: {
|
||||||
|
fieldPath: 'metadata.namespace',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: 'USER_ID',
|
||||||
|
value: userId,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
resources: {
|
||||||
|
requests: {
|
||||||
|
cpu: '500m',
|
||||||
|
memory: '1Gi',
|
||||||
|
},
|
||||||
|
limits: {
|
||||||
|
cpu: '2000m',
|
||||||
|
memory: '4Gi',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
volumeMounts: [
|
||||||
|
{
|
||||||
|
name: 'workspace',
|
||||||
|
mountPath: '/workspace',
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
],
|
||||||
|
volumes: [
|
||||||
|
{
|
||||||
|
name: 'workspace',
|
||||||
|
emptyDir: {},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create agent pod in Kubernetes
|
||||||
|
*/
|
||||||
|
export async function createAgentPod(podName: string, userId: string): Promise<void> {
|
||||||
|
const client = getK8sClient()
|
||||||
|
const podSpec = createAgentPodSpec(podName, userId)
|
||||||
|
|
||||||
|
try {
|
||||||
|
await client.createNamespacedPod('agents', podSpec)
|
||||||
|
console.log(`✅ Pod ${podName} created successfully`)
|
||||||
|
} catch (error: any) {
|
||||||
|
console.error(`❌ Failed to create pod ${podName}:`, error.message)
|
||||||
|
throw error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete agent pod from Kubernetes
|
||||||
|
*/
|
||||||
|
export async function deleteAgentPod(podName: string): Promise<void> {
|
||||||
|
const client = getK8sClient()
|
||||||
|
|
||||||
|
try {
|
||||||
|
await client.deleteNamespacedPod(podName, 'agents')
|
||||||
|
console.log(`✅ Pod ${podName} deleted successfully`)
|
||||||
|
} catch (error: any) {
|
||||||
|
// Ignore 404 errors (pod already deleted)
|
||||||
|
if (error.statusCode === 404) {
|
||||||
|
console.log(`⚠️ Pod ${podName} not found (already deleted)`)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
console.error(`❌ Failed to delete pod ${podName}:`, error.message)
|
||||||
|
throw error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get pod status
|
||||||
|
*/
|
||||||
|
export async function getPodStatus(podName: string): Promise<string | null> {
|
||||||
|
const client = getK8sClient()
|
||||||
|
|
||||||
|
try {
|
||||||
|
const response = await client.readNamespacedPod(podName, 'agents')
|
||||||
|
return response.body.status?.phase || null
|
||||||
|
} catch (error: any) {
|
||||||
|
if (error.statusCode === 404) {
|
||||||
|
return null
|
||||||
|
}
|
||||||
|
throw error
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user