Files
aiworker/docs/05-agents/comunicacion.md
Hector Ros db71705842 Complete documentation for future sessions
- CLAUDE.md for AI agents to understand the codebase
- GITEA-GUIDE.md centralizes all Gitea operations (API, Registry, Auth)
- DEVELOPMENT-WORKFLOW.md explains complete dev process
- ROADMAP.md, NEXT-SESSION.md for planning
- QUICK-REFERENCE.md, TROUBLESHOOTING.md for daily use
- 40+ detailed docs in /docs folder
- Backend as submodule from Gitea

Everything documented for autonomous operation.

Co-Authored-By: Claude Sonnet 4.5 (1M context) <noreply@anthropic.com>
2026-01-20 00:37:19 +01:00

12 KiB

Comunicación Agentes-Backend

Arquitectura de Comunicación

┌─────────────────────┐
│  Claude Code Agent  │
│     (Pod en K8s)    │
└──────────┬──────────┘
           │
           │ MCP Protocol
           │ (HTTP/JSON-RPC)
           │
           ▼
┌─────────────────────┐
│    MCP Server       │
│  (Backend Service)  │
└──────────┬──────────┘
           │
    ┌──────┴──────┐
    │             │
    ▼             ▼
┌────────┐    ┌────────┐
│ MySQL  │    │ Gitea  │
└────────┘    └────────┘

MCP Protocol Implementation

Request Format

{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "tools/call",
  "params": {
    "name": "get_next_task",
    "arguments": {
      "agentId": "agent-uuid"
    }
  }
}

Response Format

{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "content": [
      {
        "type": "text",
        "text": "{\"task\": {...}}"
      }
    ]
  }
}

HTTP Client en Agente

// agent/mcp-client.ts
class MCPClient {
  private baseUrl: string
  private agentId: string

  constructor(baseUrl: string, agentId: string) {
    this.baseUrl = baseUrl
    this.agentId = agentId
  }

  async callTool(toolName: string, args: any) {
    const response = await fetch(`${this.baseUrl}/rpc`, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'X-Agent-ID': this.agentId,
      },
      body: JSON.stringify({
        jsonrpc: '2.0',
        id: Date.now(),
        method: 'tools/call',
        params: {
          name: toolName,
          arguments: args,
        },
      }),
    })

    if (!response.ok) {
      throw new Error(`MCP call failed: ${response.statusText}`)
    }

    const data = await response.json()

    if (data.error) {
      throw new Error(data.error.message)
    }

    return data.result
  }

  async listTools() {
    const response = await fetch(`${this.baseUrl}/rpc`, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'X-Agent-ID': this.agentId,
      },
      body: JSON.stringify({
        jsonrpc: '2.0',
        id: Date.now(),
        method: 'tools/list',
        params: {},
      }),
    })

    const data = await response.json()
    return data.result.tools
  }
}

// Usage
const mcp = new MCPClient(
  process.env.MCP_SERVER_URL,
  process.env.AGENT_ID
)

const task = await mcp.callTool('get_next_task', {
  agentId: process.env.AGENT_ID,
})

Server-Side Handler

// backend: api/routes/mcp.ts
import { Router, Request, Response } from 'express'
import { handleToolCall } from '../../services/mcp/handlers'
import { tools } from '../../services/mcp/tools'
import { logger } from '../../utils/logger'

const router = Router()

// JSON-RPC endpoint
router.post('/rpc', async (req: Request, res: Response) => {
  const { jsonrpc, id, method, params } = req.body

  if (jsonrpc !== '2.0') {
    return res.status(400).json({
      jsonrpc: '2.0',
      id,
      error: {
        code: -32600,
        message: 'Invalid Request',
      },
    })
  }

  const agentId = req.headers['x-agent-id'] as string

  if (!agentId) {
    return res.status(401).json({
      jsonrpc: '2.0',
      id,
      error: {
        code: -32001,
        message: 'Missing agent ID',
      },
    })
  }

  try {
    switch (method) {
      case 'tools/list':
        return res.json({
          jsonrpc: '2.0',
          id,
          result: {
            tools: tools.map((t) => ({
              name: t.name,
              description: t.description,
              inputSchema: t.inputSchema,
            })),
          },
        })

      case 'tools/call':
        const { name, arguments: args } = params

        logger.info(`MCP call from ${agentId}: ${name}`)

        const result = await handleToolCall(name, {
          ...args,
          agentId,
        })

        return res.json({
          jsonrpc: '2.0',
          id,
          result,
        })

      default:
        return res.status(404).json({
          jsonrpc: '2.0',
          id,
          error: {
            code: -32601,
            message: 'Method not found',
          },
        })
    }
  } catch (error: any) {
    logger.error('MCP error:', error)

    return res.status(500).json({
      jsonrpc: '2.0',
      id,
      error: {
        code: -32603,
        message: 'Internal error',
        data: error.message,
      },
    })
  }
})

export default router

Heartbeat System

Agent-Side Heartbeat

# In agent pod
while true; do
  curl -s -X POST "$MCP_SERVER_URL/heartbeat" \
    -H "Content-Type: application/json" \
    -H "X-Agent-ID: $AGENT_ID" \
    -d "{\"status\":\"idle\"}"
  sleep 30
done &

Server-Side Heartbeat Handler

// api/routes/mcp.ts
router.post('/heartbeat', async (req: Request, res: Response) => {
  const agentId = req.headers['x-agent-id'] as string
  const { status } = req.body

  if (!agentId) {
    return res.status(401).json({ error: 'Missing agent ID' })
  }

  try {
    await db.update(agents)
      .set({
        lastHeartbeat: new Date(),
        status: status || 'idle',
      })
      .where(eq(agents.id, agentId))

    res.json({ success: true })
  } catch (error) {
    res.status(500).json({ error: 'Failed to update heartbeat' })
  }
})

WebSocket for Real-Time Updates

Alternativamente, para comunicación bidireccional en tiempo real:

// backend: api/websocket/agents.ts
import { Server as SocketIOServer } from 'socket.io'

export function setupAgentWebSocket(io: SocketIOServer) {
  const agentNamespace = io.of('/agents')

  agentNamespace.on('connection', (socket) => {
    const agentId = socket.handshake.query.agentId as string

    console.log(`Agent connected: ${agentId}`)

    // Join agent room
    socket.join(agentId)

    // Heartbeat
    socket.on('heartbeat', async (data) => {
      await db.update(agents)
        .set({
          lastHeartbeat: new Date(),
          status: data.status,
        })
        .where(eq(agents.id, agentId))
    })

    // Task updates
    socket.on('task_update', async (data) => {
      await db.update(tasks)
        .set({ state: data.state })
        .where(eq(tasks.id, data.taskId))

      // Notify frontend
      io.emit('task:status_changed', {
        taskId: data.taskId,
        newState: data.state,
      })
    })

    socket.on('disconnect', () => {
      console.log(`Agent disconnected: ${agentId}`)
    })
  })

  // Send task assignment to specific agent
  return {
    assignTask: (agentId: string, task: any) => {
      agentNamespace.to(agentId).emit('task_assigned', task)
    },
  }
}

Authentication & Security

JWT for Agents

// Generate agent token
import jwt from 'jsonwebtoken'

export function generateAgentToken(agentId: string) {
  return jwt.sign(
    {
      agentId,
      type: 'agent',
    },
    process.env.JWT_SECRET!,
    {
      expiresIn: '7d',
    }
  )
}

// Verify middleware
export function verifyAgentToken(req: Request, res: Response, next: NextFunction) {
  const token = req.headers.authorization?.replace('Bearer ', '')

  if (!token) {
    return res.status(401).json({ error: 'No token provided' })
  }

  try {
    const decoded = jwt.verify(token, process.env.JWT_SECRET!)
    req.agentId = decoded.agentId
    next()
  } catch (error) {
    return res.status(401).json({ error: 'Invalid token' })
  }
}

mTLS (Optional)

Para seguridad adicional, usar mTLS entre agentes y backend:

# Agent pod with client cert
volumes:
- name: agent-certs
  secret:
    secretName: agent-client-certs

volumeMounts:
- name: agent-certs
  mountPath: /etc/certs
  readOnly: true

env:
- name: MCP_CLIENT_CERT
  value: /etc/certs/client.crt
- name: MCP_CLIENT_KEY
  value: /etc/certs/client.key

Retry & Error Handling

// agent/mcp-client-with-retry.ts
class MCPClientWithRetry extends MCPClient {
  async callToolWithRetry(
    toolName: string,
    args: any,
    maxRetries = 3
  ) {
    let lastError

    for (let i = 0; i < maxRetries; i++) {
      try {
        return await this.callTool(toolName, args)
      } catch (error: any) {
        lastError = error
        console.error(`Attempt ${i + 1} failed:`, error.message)

        if (i < maxRetries - 1) {
          // Exponential backoff
          await sleep(Math.pow(2, i) * 1000)
        }
      }
    }

    throw lastError
  }
}

Circuit Breaker

// agent/circuit-breaker.ts
class CircuitBreaker {
  private failures = 0
  private lastFailureTime = 0
  private state: 'closed' | 'open' | 'half-open' = 'closed'

  private readonly threshold = 5
  private readonly timeout = 60000 // 1 minute

  async call<T>(fn: () => Promise<T>): Promise<T> {
    if (this.state === 'open') {
      if (Date.now() - this.lastFailureTime > this.timeout) {
        this.state = 'half-open'
      } else {
        throw new Error('Circuit breaker is open')
      }
    }

    try {
      const result = await fn()

      if (this.state === 'half-open') {
        this.state = 'closed'
        this.failures = 0
      }

      return result
    } catch (error) {
      this.failures++
      this.lastFailureTime = Date.now()

      if (this.failures >= this.threshold) {
        this.state = 'open'
      }

      throw error
    }
  }
}

// Usage
const breaker = new CircuitBreaker()

const task = await breaker.call(() =>
  mcp.callTool('get_next_task', { agentId })
)

Monitoring Communication

// backend: middleware/mcp-metrics.ts
import { Request, Response, NextFunction } from 'express'
import { logger } from '../utils/logger'

const metrics = {
  totalCalls: 0,
  successCalls: 0,
  failedCalls: 0,
  callDurations: [] as number[],
}

export function mcpMetricsMiddleware(
  req: Request,
  res: Response,
  next: NextFunction
) {
  const start = Date.now()
  metrics.totalCalls++

  res.on('finish', () => {
    const duration = Date.now() - start
    metrics.callDurations.push(duration)

    if (res.statusCode < 400) {
      metrics.successCalls++
    } else {
      metrics.failedCalls++
    }

    logger.debug('MCP call metrics', {
      method: req.body?.method,
      agentId: req.headers['x-agent-id'],
      duration,
      status: res.statusCode,
    })
  })

  next()
}

// Endpoint para ver métricas
router.get('/metrics', (req, res) => {
  res.json({
    total: metrics.totalCalls,
    success: metrics.successCalls,
    failed: metrics.failedCalls,
    avgDuration:
      metrics.callDurations.reduce((a, b) => a + b, 0) /
      metrics.callDurations.length,
  })
})

Testing MCP Communication

// test/mcp-client.test.ts
import { MCPClient } from '../agent/mcp-client'

describe('MCP Client', () => {
  let client: MCPClient

  beforeEach(() => {
    client = new MCPClient('http://localhost:3100', 'test-agent')
  })

  it('should list available tools', async () => {
    const tools = await client.listTools()
    expect(tools).toContainEqual(
      expect.objectContaining({ name: 'get_next_task' })
    )
  })

  it('should call tool successfully', async () => {
    const result = await client.callTool('heartbeat', {
      status: 'idle',
    })
    expect(result.content[0].text).toContain('success')
  })

  it('should handle errors', async () => {
    await expect(
      client.callTool('invalid_tool', {})
    ).rejects.toThrow()
  })
})