Compare commits
12 Commits
8382f6645e
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
db09616a69 | ||
|
|
6864258810 | ||
|
|
209b439d26 | ||
|
|
3fef6030ea | ||
|
|
65b18d13b5 | ||
|
|
9eb9def85c | ||
|
|
697ee1b426 | ||
|
|
3bc59dc964 | ||
|
|
14ae28f13c | ||
|
|
e0c6884a7b | ||
|
|
6f1c51bfd8 | ||
|
|
f104425b91 |
@@ -8,6 +8,7 @@ import { agents, tasks } from '../../db/schema'
|
||||
import { eq, and } from 'drizzle-orm'
|
||||
import { randomUUID } from 'crypto'
|
||||
import { authenticateRequest } from '../middleware/auth'
|
||||
import { createAgentPod, deleteAgentPod, createAgentService, deleteAgentService } from '../../lib/k8s'
|
||||
|
||||
/**
|
||||
* Handle all agent routes
|
||||
@@ -331,14 +332,21 @@ async function unregisterAgent(agentId: string, userId: string): Promise<Respons
|
||||
.where(eq(tasks.id, existing[0].currentTaskId))
|
||||
}
|
||||
|
||||
// Delete K8s pod and service
|
||||
try {
|
||||
await deleteAgentPod(existing[0].podName)
|
||||
await deleteAgentService(existing[0].podName)
|
||||
} catch (k8sError) {
|
||||
console.error('Failed to delete pod/service, continuing...', k8sError)
|
||||
// Continue even if deletion fails
|
||||
}
|
||||
|
||||
// Delete agent from DB
|
||||
await db.delete(agents).where(eq(agents.id, agentId))
|
||||
|
||||
// TODO: Delete K8s pod if it exists
|
||||
|
||||
return Response.json({
|
||||
success: true,
|
||||
message: 'Agent unregistered',
|
||||
message: 'Agent deleted successfully',
|
||||
})
|
||||
} catch (error) {
|
||||
console.error('Error unregistering agent:', error)
|
||||
@@ -375,14 +383,13 @@ async function listMyAgents(userId: string): Promise<Response> {
|
||||
|
||||
/**
|
||||
* Launch new agent (create pod dynamically)
|
||||
* TODO: Integrate with K8s API to create pod
|
||||
*/
|
||||
async function launchAgent(userId: string, req: Request): Promise<Response> {
|
||||
try {
|
||||
const agentId = randomUUID()
|
||||
const podName = `claude-agent-${userId.slice(0, 8)}-${Date.now()}`
|
||||
|
||||
// Create agent record in DB
|
||||
// Create agent record in DB first
|
||||
const newAgent = {
|
||||
id: agentId,
|
||||
userId,
|
||||
@@ -396,20 +403,26 @@ async function launchAgent(userId: string, req: Request): Promise<Response> {
|
||||
|
||||
await db.insert(agents).values(newAgent)
|
||||
|
||||
// TODO: Create K8s pod using K8s API
|
||||
// For now, just return the agent record
|
||||
// In production, this would call kubectl or use @kubernetes/client-node
|
||||
// Create K8s pod and service
|
||||
try {
|
||||
await createAgentPod(podName, userId, agentId)
|
||||
await createAgentService(podName, agentId)
|
||||
} catch (k8sError: any) {
|
||||
// If pod/service creation fails, rollback DB entry
|
||||
await db.delete(agents).where(eq(agents.id, agentId))
|
||||
throw new Error(`Failed to create pod: ${k8sError.message}`)
|
||||
}
|
||||
|
||||
return Response.json({
|
||||
success: true,
|
||||
data: newAgent,
|
||||
message: 'Agent launch initiated. Pod will be created shortly.',
|
||||
message: 'Agent launched successfully. Pod is starting...',
|
||||
}, { status: 201 })
|
||||
} catch (error) {
|
||||
} catch (error: any) {
|
||||
console.error('Error launching agent:', error)
|
||||
return Response.json({
|
||||
success: false,
|
||||
error: 'Failed to launch agent',
|
||||
error: error.message || 'Failed to launch agent',
|
||||
}, { status: 500 })
|
||||
}
|
||||
}
|
||||
|
||||
24
src/index.ts
24
src/index.ts
@@ -9,6 +9,7 @@ import { handleAuthRoutes, handleProjectRoutes, handleTaskRoutes, handleAgentRou
|
||||
import { authenticateRequest } from './api/middleware/auth'
|
||||
import { agents } from './db/schema'
|
||||
import { eq, and } from 'drizzle-orm'
|
||||
import { initK8sClient } from './lib/k8s'
|
||||
|
||||
console.log('🚀 Starting AiWorker Backend...')
|
||||
console.log(`Bun version: ${Bun.version}`)
|
||||
@@ -20,6 +21,15 @@ await runMigrations()
|
||||
// Test database connection
|
||||
await testConnection()
|
||||
|
||||
// Initialize Kubernetes client
|
||||
try {
|
||||
initK8sClient()
|
||||
console.log('✅ Kubernetes client initialized')
|
||||
} catch (error) {
|
||||
console.error('⚠️ Failed to initialize Kubernetes client:', error)
|
||||
console.log('⚠️ Agent pod management will not be available')
|
||||
}
|
||||
|
||||
const PORT = process.env.PORT || 3000
|
||||
|
||||
// Health check route
|
||||
@@ -100,19 +110,23 @@ const server = Bun.serve({
|
||||
)
|
||||
}
|
||||
|
||||
// Proxy to agent terminal
|
||||
const agentUrl = `http://${agent.podName}.agents.svc.cluster.local:7681${url.pathname.replace(`/agent-terminal/${agentId}`, '')}${url.search}`
|
||||
// Proxy to agent terminal via service DNS
|
||||
// Service name: {podName}-terminal.agents.svc.cluster.local:7681
|
||||
const agentPath = url.pathname.replace(`/agent-terminal/${agentId}`, '') || '/'
|
||||
const serviceUrl = `http://${agent.podName}-terminal.agents.svc.cluster.local:7681${agentPath}${url.search}`
|
||||
|
||||
console.log(`🔄 Proxying terminal request to ${serviceUrl}`)
|
||||
|
||||
try {
|
||||
const response = await fetch(agentUrl, {
|
||||
const response = await fetch(serviceUrl, {
|
||||
method: req.method,
|
||||
headers: req.headers,
|
||||
body: req.body,
|
||||
})
|
||||
|
||||
return response
|
||||
} catch (error) {
|
||||
console.error('Terminal proxy error:', error)
|
||||
} catch (error: any) {
|
||||
console.error('Terminal proxy error:', error.message)
|
||||
return Response.json(
|
||||
{ success: false, message: 'Failed to connect to agent terminal' },
|
||||
{ status: 502 }
|
||||
|
||||
361
src/lib/k8s.ts
Normal file
361
src/lib/k8s.ts
Normal file
@@ -0,0 +1,361 @@
|
||||
/**
|
||||
* Kubernetes API client utilities
|
||||
*/
|
||||
|
||||
import * as k8s from '@kubernetes/client-node'
|
||||
import * as https from 'https'
|
||||
|
||||
let k8sClient: k8s.CoreV1Api | null = null
|
||||
let k8sConfig: k8s.KubeConfig | null = null
|
||||
|
||||
/**
|
||||
* Initialize Kubernetes client
|
||||
*/
|
||||
export function initK8sClient() {
|
||||
if (k8sClient) return k8sClient
|
||||
|
||||
k8sConfig = new k8s.KubeConfig()
|
||||
|
||||
// Check if running in cluster
|
||||
const inCluster = process.env.K8S_IN_CLUSTER === 'true'
|
||||
|
||||
if (inCluster) {
|
||||
k8sConfig.loadFromCluster()
|
||||
console.log('📦 Loaded K8s config from cluster')
|
||||
|
||||
// Skip TLS verification when in cluster
|
||||
// This is needed because the cluster uses self-signed certificates
|
||||
const cluster = k8sConfig.getCurrentCluster()
|
||||
console.log('📦 Current cluster:', cluster)
|
||||
|
||||
if (cluster) {
|
||||
cluster.skipTLSVerify = true
|
||||
console.log('🔓 Set skipTLSVerify = true')
|
||||
}
|
||||
|
||||
// Create custom HTTPS agent that ignores certificate errors
|
||||
const httpsAgent = new https.Agent({
|
||||
rejectUnauthorized: false
|
||||
})
|
||||
console.log('🔓 Created HTTPS agent with rejectUnauthorized: false')
|
||||
|
||||
// Apply custom agent to the config
|
||||
try {
|
||||
k8sConfig.applyToHTTPSOptions({
|
||||
httpsAgent: httpsAgent
|
||||
} as any)
|
||||
console.log('✅ Applied custom HTTPS agent to K8s config')
|
||||
} catch (applyError: any) {
|
||||
console.error('❌ Failed to apply HTTPS options:', applyError.message)
|
||||
}
|
||||
} else {
|
||||
// Load from kubeconfig file
|
||||
const configPath = process.env.K8S_CONFIG_PATH || process.env.KUBECONFIG || '~/.kube/config'
|
||||
k8sConfig.loadFromFile(configPath)
|
||||
}
|
||||
|
||||
k8sClient = k8sConfig.makeApiClient(k8s.CoreV1Api)
|
||||
return k8sClient
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Kubernetes client
|
||||
*/
|
||||
export function getK8sClient(): k8s.CoreV1Api {
|
||||
if (!k8sClient) {
|
||||
return initK8sClient()
|
||||
}
|
||||
return k8sClient
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Kubernetes client with custom request options
|
||||
* This ensures the HTTPS agent is used for each request
|
||||
*/
|
||||
export function getK8sClientWithOptions(): { client: k8s.CoreV1Api, options: any } {
|
||||
const client = getK8sClient()
|
||||
|
||||
// Create request options with custom HTTPS agent
|
||||
const options = {
|
||||
httpsAgent: new https.Agent({
|
||||
rejectUnauthorized: false
|
||||
})
|
||||
}
|
||||
|
||||
return { client, options }
|
||||
}
|
||||
|
||||
/**
|
||||
* Create pod spec for agent
|
||||
*/
|
||||
export function createAgentPodSpec(podName: string, userId: string): k8s.V1Pod {
|
||||
return {
|
||||
metadata: {
|
||||
name: podName,
|
||||
labels: {
|
||||
app: 'claude-agent',
|
||||
userId: userId,
|
||||
podName: podName,
|
||||
'aiworker.io/agent': 'true',
|
||||
},
|
||||
},
|
||||
spec: {
|
||||
serviceAccountName: 'agent-sa',
|
||||
imagePullSecrets: [
|
||||
{
|
||||
name: 'gitea-registry',
|
||||
},
|
||||
],
|
||||
containers: [
|
||||
{
|
||||
name: 'agent',
|
||||
image: 'git.fuq.tv/admin/aiworker-agent:latest',
|
||||
imagePullPolicy: 'Always',
|
||||
ports: [
|
||||
{
|
||||
containerPort: 7681,
|
||||
name: 'terminal',
|
||||
},
|
||||
],
|
||||
env: [
|
||||
{
|
||||
name: 'BACKEND_URL',
|
||||
value: 'https://api.fuq.tv',
|
||||
},
|
||||
{
|
||||
name: 'MCP_ENDPOINT',
|
||||
value: 'https://api.fuq.tv/api/mcp',
|
||||
},
|
||||
{
|
||||
name: 'GITEA_URL',
|
||||
value: 'https://git.fuq.tv',
|
||||
},
|
||||
{
|
||||
name: 'GITEA_TOKEN',
|
||||
valueFrom: {
|
||||
secretKeyRef: {
|
||||
name: 'agent-secrets',
|
||||
key: 'gitea-token',
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'POD_NAME',
|
||||
valueFrom: {
|
||||
fieldRef: {
|
||||
fieldPath: 'metadata.name',
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'NAMESPACE',
|
||||
valueFrom: {
|
||||
fieldRef: {
|
||||
fieldPath: 'metadata.namespace',
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'USER_ID',
|
||||
value: userId,
|
||||
},
|
||||
],
|
||||
resources: {
|
||||
requests: {
|
||||
cpu: '500m',
|
||||
memory: '1Gi',
|
||||
},
|
||||
limits: {
|
||||
cpu: '2000m',
|
||||
memory: '4Gi',
|
||||
},
|
||||
},
|
||||
volumeMounts: [
|
||||
{
|
||||
name: 'workspace',
|
||||
mountPath: '/workspace',
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
volumes: [
|
||||
{
|
||||
name: 'workspace',
|
||||
emptyDir: {},
|
||||
},
|
||||
],
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create service for agent pod (for terminal access)
|
||||
*/
|
||||
export async function createAgentService(podName: string, agentId: string): Promise<void> {
|
||||
const client = getK8sClient()
|
||||
|
||||
const serviceSpec: k8s.V1Service = {
|
||||
metadata: {
|
||||
name: `${podName}-terminal`,
|
||||
namespace: 'agents',
|
||||
labels: {
|
||||
app: 'claude-agent-terminal',
|
||||
agentId: agentId,
|
||||
}
|
||||
},
|
||||
spec: {
|
||||
selector: {
|
||||
app: 'claude-agent',
|
||||
podName: podName,
|
||||
},
|
||||
ports: [{
|
||||
name: 'terminal',
|
||||
port: 7681,
|
||||
targetPort: 7681 as any,
|
||||
protocol: 'TCP'
|
||||
}],
|
||||
type: 'ClusterIP'
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
await client.createNamespacedService({
|
||||
namespace: 'agents',
|
||||
body: serviceSpec
|
||||
})
|
||||
console.log(`✅ Service ${podName}-terminal created`)
|
||||
} catch (error: any) {
|
||||
console.error(`❌ Failed to create service:`, error.message)
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete agent service
|
||||
*/
|
||||
export async function deleteAgentService(podName: string): Promise<void> {
|
||||
const client = getK8sClient()
|
||||
|
||||
try {
|
||||
await client.deleteNamespacedService({
|
||||
name: `${podName}-terminal`,
|
||||
namespace: 'agents'
|
||||
})
|
||||
console.log(`✅ Service ${podName}-terminal deleted`)
|
||||
} catch (error: any) {
|
||||
if (error.statusCode === 404 || error.response?.statusCode === 404) {
|
||||
console.log(`⚠️ Service ${podName}-terminal not found`)
|
||||
return
|
||||
}
|
||||
console.error(`❌ Error deleting service:`, error.message)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create agent pod in Kubernetes
|
||||
*/
|
||||
export async function createAgentPod(podName: string, userId: string, agentId: string): Promise<void> {
|
||||
const { client, options } = getK8sClientWithOptions()
|
||||
const podSpec = createAgentPodSpec(podName, userId)
|
||||
|
||||
console.log(`🔧 Creating pod ${podName} for user ${userId}`)
|
||||
console.log(`🔧 Using custom HTTPS agent with rejectUnauthorized: false`)
|
||||
|
||||
try {
|
||||
const result = await client.createNamespacedPod({
|
||||
namespace: 'agents',
|
||||
body: podSpec
|
||||
}, undefined, undefined, undefined, undefined, options)
|
||||
|
||||
console.log(`✅ Pod ${podName} created successfully`)
|
||||
if (result?.body?.metadata?.uid) {
|
||||
console.log(`✅ Pod UID: ${result.body.metadata.uid}`)
|
||||
}
|
||||
} catch (error: any) {
|
||||
console.error(`❌ Failed to create pod ${podName}`)
|
||||
console.error(`❌ Error message:`, error.message)
|
||||
console.error(`❌ Error code:`, error.code)
|
||||
if (error.response) {
|
||||
console.error(`❌ Response status:`, error.response.statusCode)
|
||||
console.error(`❌ Response body:`, error.response.body)
|
||||
}
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete agent pod from Kubernetes
|
||||
*/
|
||||
export async function deleteAgentPod(podName: string): Promise<void> {
|
||||
const client = getK8sClient()
|
||||
|
||||
try {
|
||||
await client.deleteNamespacedPod({
|
||||
name: podName,
|
||||
namespace: 'agents'
|
||||
})
|
||||
console.log(`✅ Pod ${podName} deleted successfully`)
|
||||
} catch (error: any) {
|
||||
// Ignore 404 errors (pod already deleted)
|
||||
if (error.statusCode === 404 || error.response?.statusCode === 404) {
|
||||
console.log(`⚠️ Pod ${podName} not found (already deleted)`)
|
||||
return
|
||||
}
|
||||
console.error(`❌ Failed to delete pod ${podName}:`, error.message)
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get pod status
|
||||
*/
|
||||
export async function getPodStatus(podName: string): Promise<string | null> {
|
||||
const client = getK8sClient()
|
||||
|
||||
try {
|
||||
const response = await client.readNamespacedPod({
|
||||
name: podName,
|
||||
namespace: 'agents'
|
||||
})
|
||||
|
||||
// Handle different response structures
|
||||
const pod = response.body || response
|
||||
return pod?.status?.phase || null
|
||||
} catch (error: any) {
|
||||
if (error.statusCode === 404 || error.response?.statusCode === 404) {
|
||||
return null
|
||||
}
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get pod IP address
|
||||
*/
|
||||
export async function getPodIP(podName: string): Promise<string | null> {
|
||||
const client = getK8sClient()
|
||||
|
||||
try {
|
||||
console.log(`🔍 Getting IP for pod: ${podName}`)
|
||||
const response = await client.readNamespacedPod({
|
||||
name: podName,
|
||||
namespace: 'agents'
|
||||
})
|
||||
|
||||
console.log(`🔍 Response type: ${typeof response}`)
|
||||
console.log(`🔍 Has body: ${'body' in response}`)
|
||||
|
||||
// Handle different response structures
|
||||
const pod = response.body || response
|
||||
const podIP = pod?.status?.podIP
|
||||
|
||||
console.log(`🔍 Pod IP: ${podIP}`)
|
||||
return podIP || null
|
||||
} catch (error: any) {
|
||||
console.error(`❌ Error getting pod IP for ${podName}:`, error.message)
|
||||
if (error.statusCode === 404 || error.response?.statusCode === 404) {
|
||||
return null
|
||||
}
|
||||
throw error
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user