Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

AgentCore 通信机制

本章详细说明容器内的 agentcore(智能体核心进程)如何与 Backend 以及最终用户进行双向实时通信。这是理解“用户发消息 → agentcore 处理 → 实时看到回复 → agentcore 还能反过来向用户提问“这条完整链路的关键。

设计原则:系统采用 SessionWorker(会话工作进程)模式——容器以 Deployment(部署)方式长驻运行,通过 WebSocket(网络套接字) 双向通道与 Backend 实时通信。所有数据库访问都收敛到 Backend;容器内的 Worker 只负责协议通信,不直接连接数据库。


1. 通信架构总览

1.1 三层通信模型

graph TB
    subgraph LOCAL["用户本地机器(同一台设备)"]
        U["用户"]
        H["Helper(前端)"]
        B["Backend(后端服务)"]
        subgraph K3S["K3s 集群"]
            W["SessionWorker Pod<br/>agentcore 进程"]
        end
    end

    U -->|"操作界面"| H
    H -->|"WebSocket<br/>双向实时通道"| B
    B <-->|"WebSocket<br/>双向实时通道"| W
    B -->|"推送更新"| H
    H -->|"渲染显示"| U

    style LOCAL fill:#f5f5f5,stroke:#333
    style K3S fill:#e8f4fd,stroke:#0969da

1.2 通信协议一览

链路协议方向说明
Backend ↔ WorkerWebSocket(推荐)双向实时Worker 启动后主动连接 Backend 的 WS 端点,所有消息和事件通过同一条持久连接传输
Helper ↔ BackendWebSocket双向实时Helper 连接 Backend 的 WS 端点,实时接收流式输出和状态更新
Helper → BackendHTTP REST请求-响应创建会话、创建运行、上传文件等操作仍使用 REST API
Backend ↔ Worker(降级)HTTP POST单向推送当 WebSocket 不可用时降级为 HTTP 回调模式

1.3 协议职责边界

组件负责什么不负责什么
Worker / agentcore推理、工具调用、文件读写、协议消息收发不直接查数据库,不直接持有数据库凭据
Backend会话管理、数据库查询、状态持久化、权限与审计不替 Worker 执行推理
Helper展示 UI、收集用户输入、展示流式结果不直接查数据库,不直接和 Pod 建立协议通道

因此 Backend ↔ Worker 的协议除了“消息推送”和“流式输出”,还必须承担:

  • Worker 请求 Backend 查询数据
  • Worker 请求 Backend 写入状态
  • Worker 请求读取 checkpoint
  • Worker 上报新的 checkpoint

1.4 为什么选择 WebSocket 而非 HTTP

对比维度HTTP POST 回调(旧方案)WebSocket(推荐方案)
流式输出开销每个 token 片段 = 1 次 HTTP 请求(约 300 字节头 + JSON 体),500 token = 500 次请求每个 token 片段 = 1 个 WS 帧(2-10 字节帧头 + JSON 体),开销降低 95%
连接管理每次请求需要 TCP 连接(即使用 keep-alive 也有头部开销)单个持久 TCP 连接,零连接建立开销
消息顺序HTTP 请求可能乱序到达(多连接并发时)TCP 保证帧顺序,无需额外序列号
背压控制无内建机制,Worker 可能 POST 过快压垮 BackendTCP 流控天然提供背压(backpressure),自动限速
双向通信需要两套独立的 HTTP 通道(Backend→Worker + Worker→Backend)同一条连接双向传输,架构更简单
连接状态感知需要轮询或心跳检测 Worker 是否存活连接断开时 Backend 立即感知
延迟本地部署约 1-2ms / 请求(TCP 握手 + HTTP 解析)本地部署约 0.1ms / 帧(仅帧解析)

1.5 协议语义总规则

类别语义代表消息
同步请求-响应必须等待结果再继续backend_querybackend_commandcheckpoint_getcheckpoint_put
异步事件流发出后继续执行stream_chunkstate_changetool_progress
混合语义提交异步,但业务流程暂停等待ask_user / user_reply

2. SessionWorker 容器内部架构

2.1 容器结构

SessionWorker 以 Deployment(部署)方式运行,每个 Pod 内包含:

+---------------------------------------------+
|  SessionWorker Pod                          |
|                                             |
|  +----------------------------------------+ |
|  |  启动脚本(bash)                       | |
|  |  1. 创建工作目录                        | |
|  |  2. 写入运行时元数据文件                 | |
|  |  3. 创建 ready 标记文件                 | |
|  |  4. 注册 TERM 信号处理(优雅关闭)       | |
|  |  5. 启动心跳循环(每 30 秒)            | |
|  +----------------------------------------+ |
|                                             |
|  +----------------------------------------+ |
|  |  agentcore 进程                        | |
|  |  * 建立 WS 连接到 Backend               | |
|  |  * 监听 :8081(HTTP 控制端口,探针用)   | |
|  |  * 通过 WS 接收用户消息                  | |
|  |  * 调用大模型 API 进行推理              | |
|  |  * 执行工具调用                         | |
|  |  * 通过 WS 推送结果给 Backend            | |
|  +----------------------------------------+ |
|                                             |
|  探针检查(保留 HTTP):                      |
|  * readinessProbe: test -f /tmp/agent-store-worker.ready |
|  * livenessProbe:  test -f /tmp/agent-store-worker.ready |
+---------------------------------------------+

注意:K8s 健康探针(readinessProbe / livenessProbe)仍使用文件检查方式,这是 K8s 原生支持的最简单方案。WebSocket 仅用于数据通信,不替代探针。

2.2 容器环境变量

Backend 通过环境变量向容器注入所有配置信息:

环境变量含义示例值
RUN_ID本次运行 ID550e8400-e29b-41d4-a716-446655440000
SESSION_ID会话 ID660e8400-e29b-41d4-a716-446655440001
USER_ID用户 IDuser-001
AGENT_PACKAGE_NAME智能体包名code-assistant
AGENT_PACKAGE_ROOT包文件挂载路径/opt/agent/package
AGENT_SESSION_STATE_ROOT会话本地目录(缓存/日志)/workspace/session
AGENT_CHECKPOINT_ROOT本地缓存或临时状态目录(可选)/workspace/session/checkpoints
AGENT_WORKLOAD_KIND工作负载类型session-worker
AGENT_RUNTIME_IMAGE_MODE镜像模式prewarmed-base
AGENT_CHECKPOINT_STORE_MODE检查点持久化策略backend-managed
AGENT_WORKER_SERVICE_NAMEWorker 的 K8s Service 名称agent-worker-svc-xxxx
AGENT_WORKER_EVENT_CHANNEL_KIND事件通道类型websocket(推荐) / http-callback(降级)
AGENT_WORKER_WS_URLWebSocket 连接地址(WS 模式)ws://host.k3s.internal:3000/ws/workers/RUN_ID
AGENT_WORKER_HTTP_CALLBACK_URL回调 URL(HTTP 降级模式)http://host.k3s.internal:3000/api/v1/worker-events
AGENT_WORKER_IDLE_TTL_SECONDS空闲超时时间(秒)300
AGENT_WORKER_MAX_LIFETIME_SECONDS最大存活时间(秒,可选)3600

3. WebSocket 通信协议详解(推荐方案)

3.1 连接建立

Worker 启动后,agentcore 主动发起 WebSocket 连接到 Backend:

sequenceDiagram
    participant W as Worker Pod
    participant B as Backend

    Note over W: agentcore 启动完成
    W->>B: WebSocket 握手 ws://backend:3000/ws/workers/RUN_ID
    B->>B: 验证 run_id, 注册 WS 连接
    B-->>W: 101 Switching Protocols
    Note over W,B: WebSocket 连接建立, 双向通道就绪
    W->>B: WS state_change IDLE
    Note over B: 标记 Worker 为 IDLE, 可以推送消息

WebSocket 端点设计:

Backend 端点用途连接方
ws://backend:PORT/ws/workers/RUN_IDBackend ↔ Worker 数据通道Worker 主动连接
ws://backend:PORT/ws/sessions/SESSION_IDBackend → Helper 实时推送Helper 主动连接

连接方向说明: Worker 主动连接到 Backend(而非 Backend 连接到 Worker),这样设计的好处是:

  • Worker 已通过环境变量知道 Backend 地址,无需额外服务发现
  • Backend 无需等待 K8s Service 就绪再连接
  • 连接生命周期与 Worker 生命周期天然绑定

3.2 消息帧格式

所有 WebSocket 消息均为 JSON 文本帧,统一格式:

{
  "type": "消息类型",
  "id": "消息唯一 ID(UUID)",
  "timestamp": "2024-01-15T10:30:00Z",
  "payload": {}
}

3.3 Backend → Worker 消息类型

消息类型用途说明
push_message推送用户消息给 agentcore包含用户输入、上下文、附件
user_reply转发用户对 ask_user 的回复包含回复内容和原始 questionId
cancel_task取消当前正在执行的任务agentcore 应尽快停止当前推理
backend_query_result返回 Worker 请求的数据查询结果对应同步查询
backend_command_result返回 Worker 请求的写入结果对应同步写入
checkpoint_get_result返回恢复所需的 checkpoint 数据对应同步恢复
checkpoint_put_ack确认 checkpoint 已被 Backend 持久化对应同步保存
ping心跳检查Worker 应回复 pong

push_message 示例:

{
  "type": "push_message",
  "id": "msg-uuid-001",
  "timestamp": "2024-01-15T10:30:00Z",
  "payload": {
    "sessionId": "session-uuid",
    "runId": "run-uuid",
    "content": "请帮我分析这段代码",
    "artifacts": [
      { "name": "main.py", "relativePath": "src/main.py" }
    ],
    "context": {
      "history": [],
      "sessionRuntimeState": {}
    }
  }
}

user_reply 示例:

{
  "type": "user_reply",
  "id": "reply-uuid-001",
  "timestamp": "2024-01-15T10:30:05Z",
  "payload": {
    "questionId": "q-uuid-001",
    "answer": "确认删除",
    "approved": true
  }
}

3.4 Worker → Backend 消息类型

消息类型用途频率
stream_chunk流式输出的 token 片段高频:每 token 一条
message_response最终回答(完整或部分)每轮对话 1 条
ask_useragentcore 向用户提问按需
state_change生命周期状态变更状态切换时
tool_call工具调用通知(需要用户授权时)按需
backend_query请求 Backend 查询数据库按需,同步语义
backend_command请求 Backend 执行数据库写入按需,同步语义
checkpoint_get请求恢复 checkpoint启动或恢复时,同步语义
checkpoint_put上报最新 checkpoint关键步骤后,同步语义
error执行错误报告异常时
pong心跳回复响应 ping

stream_chunk 示例(最高频消息):

{
  "type": "stream_chunk",
  "id": "chunk-042",
  "timestamp": "2024-01-15T10:30:01.123Z",
  "payload": {
    "chunkIndex": 42,
    "content": "这段代码",
    "renderFormat": "markdown",
    "isComplete": false
  }
}

对比 HTTP 回调:同样的 stream_chunk 在 HTTP 模式下需要完整的 HTTP POST 请求(约 300 字节 HTTP 头 + 约 100 字节 JSON 体 = 约 400 字节),而 WebSocket 帧仅需约 2 字节帧头 + 约 100 字节 JSON 体 = 约 102 字节。单条消息开销降低约 75%

backend_query 示例(同步请求):

{
  "type": "backend_query",
  "id": "query-001",
  "timestamp": "2026-04-17T02:30:00Z",
  "payload": {
    "resource": "session_memory",
    "operation": "get",
    "arguments": {
      "sessionId": "session-uuid"
    }
  }
}

backend_query_result 示例:

{
  "type": "backend_query_result",
  "id": "query-001",
  "timestamp": "2026-04-17T02:30:00Z",
  "payload": {
    "ok": true,
    "data": {
      "summary": "最近 3 轮都在讨论数据库迁移"
    }
  }
}

checkpoint_put 示例:

{
  "type": "checkpoint_put",
  "id": "ckp-001",
  "timestamp": "2026-04-17T02:30:02Z",
  "payload": {
    "sessionId": "session-uuid",
    "runId": "run-uuid",
    "snapshot": {
      "historyCursor": 12,
      "memoryState": {},
      "toolState": {}
    }
  }
}

message_response 示例:

{
  "type": "message_response",
  "id": "resp-uuid-001",
  "timestamp": "2024-01-15T10:30:02Z",
  "payload": {
    "messageId": "resp-uuid-001",
    "content": "这段代码的主要问题是...",
    "renderFormat": "markdown",
    "isComplete": true,
    "usage": {
      "promptTokens": 1500,
      "completionTokens": 800,
      "totalTokens": 2300
    }
  }
}

ask_user 示例:

{
  "type": "ask_user",
  "id": "ask-uuid-001",
  "timestamp": "2024-01-15T10:30:01.500Z",
  "payload": {
    "questionId": "q-uuid-001",
    "question": "我需要删除这个文件,是否确认?",
    "questionType": "confirmation",
    "options": ["确认删除", "取消"],
    "timeout_seconds": 300
  }
}

state_change 示例:

{
  "type": "state_change",
  "id": "sc-uuid-001",
  "timestamp": "2024-01-15T10:30:00Z",
  "payload": {
    "previousState": "BUSY",
    "newState": "IDLE",
    "reason": "task completed"
  }
}

error 示例:

{
  "type": "error",
  "id": "err-uuid-001",
  "timestamp": "2024-01-15T10:30:03Z",
  "payload": {
    "errorCode": "LLM_API_TIMEOUT",
    "message": "调用大模型 API 超时",
    "recoverable": true,
    "details": {}
  }
}

3.5 WebSocket 连接管理

心跳机制

方向间隔超时动作
Backend → Worker每 15 秒发送 ping30 秒无 pongBackend 判定 Worker 异常,标记 FAILED
Worker → Backend收到 ping 立即回复 pong

WebSocket 协议本身支持 Ping/Pong 控制帧(RFC 6455),可以直接使用协议层心跳,无需应用层额外实现。

断线重连

当 WebSocket 连接意外断开时:

graph TD
    A["WS 连接断开"] --> B{"Worker 是否仍在运行?"}
    B -->|"是"| C["Worker 尝试重连<br/>指数退避: 1s, 2s, 4s, 8s..."]
    B -->|"否 进程退出"| D["Backend 标记 Worker FAILED"]
    C --> E{"重连成功?"}
    E -->|"是"| F["恢复通信<br/>Worker 发送 state_change"]
    E -->|"否, 超过 30 秒"| G["Worker 自行退出"]
    G --> D

重连策略:

  • 初始等待 1 秒,指数退避最大 8 秒
  • 最大重试时间 30 秒(与 terminationGracePeriodSeconds 一致)
  • 重连成功后 Worker 发送 state_change 通知 Backend 当前状态

3.6 数据访问为什么走协议而不是直连数据库

sequenceDiagram
    participant W as Worker
    participant B as Backend
    participant DB as 数据库

    W->>B: backend_query
    B->>DB: 查库
    DB-->>B: 返回结果
    B-->>W: backend_query_result

这样做的直接收益是:

  1. 数据库账号、密码、DSN 只保留在 Backend
  2. Worker 只面对统一协议,不区分 PostgreSQL / MySQL / Redis / 向量库
  3. Backend 可以在查库前做权限控制、审计、缓存和限流

优雅关闭

Worker 收到 SIGTERM
  -> 通过 WS 发送 state_change(DRAINING)
  -> Backend 停止推送新消息
  -> 等待当前任务完成
  -> 发送 WebSocket Close 帧(状态码 1000: Normal Closure)
  -> 关闭连接
  -> 进程退出

3.7 事件通道机制(WorkerEventChannel)

代码中的 WorkerEventChannel(工作进程事件通道)枚举定义了通信模式。推荐升级后支持三种:

通道类型环境变量工作方式状态适用场景
WebSocketAGENT_WORKER_WS_URLWorker 主动连接 Backend WS 端点,双向通信推荐所有场景,特别是流式输出
HttpCallbackAGENT_WORKER_HTTP_CALLBACK_URLWorker 主动 POST 到 Backend + Backend POST 到 Worker 8081 端口降级备选WebSocket 不可用时
QueueConsumerAGENT_WORKER_QUEUE_NAMEWorker 从消息队列拉取/推送预留未来高并发分布式场景

WorkerEventChannel Rust 类型定义(推荐升级后):

#![allow(unused)]
fn main() {
pub enum WorkerEventChannel {
    /// 推荐:Worker 主动连接 Backend 的 WebSocket 端点
    WebSocket { backend_ws_url: String },
    /// 降级:双向 HTTP POST 回调
    HttpCallback { callback_url: String },
    /// 预留:消息队列模式
    QueueConsumer { queue_name: String },
}
}

4. HTTP 降级模式(当前实现)

当 WebSocket 不可用时(如容器运行时不支持长连接),系统降级为 HTTP POST 回调模式。

4.1 Backend → Worker:HTTP 控制端口

Backend 通过 K8s Service 将 HTTP 请求转发到 Worker Pod 的 8081 端口(控制端口)。

端点方法用途
/messagesPOST推送用户消息给 agentcore
/cancelPOST取消当前正在执行的任务
/statusGET查询当前生命周期状态
/healthGET健康检查

4.2 Worker → Backend:HTTP 回调

Worker 通过 AGENT_WORKER_HTTP_CALLBACK_URL 向 Backend 推送事件。每种事件类型(stream_chunk、message_response、ask_user 等)都是一个独立的 HTTP POST 请求。

缺点:每个 stream_chunk token 片段都需要一个完整的 HTTP 请求,500 token 的回复将产生约 500 次 HTTP POST,这在性能和可靠性上都不理想。


5. Helper ↔ Backend API 详解

Helper 与 Backend 之间通过 WebSocket + HTTP REST 混合模式通信(均在本机 localhost)。

5.1 HTTP REST API(请求-响应操作)

这些 API 的语义也并不完全一样:

  • 同步GET /sessions/{id}POST /artifacts/uploadPOST /mount-grants
  • 混合POST /runsPOST /runs/{id}/cancel
  • 异步结果承载:真正的执行结果仍通过 WebSocket 返回
操作方法路径说明
健康检查GET/health返回 200 表示服务正常
就绪检查GET/ready返回 200 表示服务可接受请求
集群状态GET/api/v1/cluster/statusK3s 集群健康状态
运行时配置列表GET/api/v1/runtime-profiles可用的运行时环境配置
智能体包列表GET/api/v1/packages可用的智能体包
智能体包详情GET/api/v1/packages/NAME指定包的详细信息
会话列表GET/api/v1/sessions所有会话列表
创建会话POST/api/v1/sessions开启新对话会话
获取会话GET/api/v1/sessions/ID会话详情和对话历史
关闭会话POST/api/v1/sessions/ID/close关闭会话
重开会话POST/api/v1/sessions/ID/reopen重新打开已关闭的会话
运行列表GET/api/v1/runs所有运行记录
创建运行POST/api/v1/runs核心:发送消息并启动/复用 Worker
获取运行GET/api/v1/runs/ID运行状态和结果
取消运行POST/api/v1/runs/ID/cancel取消正在执行的运行
运行结果列表GET/api/v1/runs/ID/results输出文件列表
读取结果文件POST/api/v1/runs/ID/results/read读取指定输出文件内容
文件上传POST/api/v1/artifacts/upload上传用户文件
文件列表GET/api/v1/artifacts已上传文件列表
挂载授权列表GET/api/v1/mount-grants目录挂载授权列表
创建挂载授权POST/api/v1/mount-grants授权容器访问本地目录
审计事件GET/api/v1/audit-events操作审计日志

5.2 WebSocket 实时通道(流式推送)

Helper 连接 ws://localhost:PORT/ws/sessions/SESSION_ID,实时接收 Worker 的流式输出和状态更新:

Backend 推送的消息类型含义来源
stream_chunkAI 输出的 token 片段Worker 转发
message_response最终回答Worker 转发
ask_useragentcore 向用户提问Worker 转发
state_changeWorker 状态变更Worker 转发或 Backend 生成
run_status运行状态更新(SUBMITTED, COMPLETED 等)Backend 生成

5.3 创建运行(核心 API)请求格式

{
  "sessionId": "会话 UUID",
  "packageName": "智能体包名",
  "userPrompt": "用户发送的消息内容",
  "workloadKind": "sessionWorker",
  "runtimeProfile": "codex-standard",
  "runtimeImageMode": "prewarmedBase",
  "checkpointStoreMode": "backendManaged",
  "workerEventChannel": {
    "webSocket": {
      "backendWsUrl": "ws://host.k3s.internal:3000/ws/workers"
    }
  },
  "workerIdlePolicy": {
    "idleTtlSeconds": 300,
    "maxLifetimeSeconds": 3600
  },
  "providerEnv": {
    "ANTHROPIC_API_KEY": "sk-..."
  },
  "artifactIds": [],
  "externalServiceBindings": []
}

6. 完整交互时序

6.1 首次会话(Worker 冷启动 + WebSocket 建连)

sequenceDiagram
    participant U as 用户
    participant H as Helper
    participant B as Backend
    participant K as K3s API
    participant W as Worker Pod

    rect rgb(230, 245, 255)
    Note over U,W: 1 创建会话
    U->>H: 选择智能体, 开始对话
    H->>B: POST /api/v1/sessions
    B->>B: 创建 SessionRecord, 初始化状态目录
    B-->>H: 返回 session_id
    H->>B: 建立 WS 连接 ws://backend/ws/sessions/SESSION_ID
    end

    rect rgb(255, 245, 230)
    Note over U,W: 2 发送首条消息, 启动 Worker
    U->>H: 输入消息
    H->>B: POST /api/v1/runs
    B->>B: 构建 LaunchPlan
    B->>K: 创建 ConfigMap + Secret + Deployment
    B-->>H: 返回 run_id
    H-->>U: 显示启动中
    end

    rect rgb(230, 255, 230)
    Note over W: 3 Worker 启动并建立 WS 连接
    K->>W: 调度 Pod
    W->>W: 创建工作目录, 写入元数据, touch ready
    W->>B: WebSocket 连接 ws://backend/ws/workers/RUN_ID
    B-->>W: 101 连接建立
    W->>B: WS state_change IDLE
    Note over B: Worker 就绪
    end

    rect rgb(255, 230, 255)
    Note over U,W: 4 Backend 通过 WS 推送消息
    B->>W: WS push_message(用户消息 + 初始上下文)
    W->>W: agentcore 开始推理
    W->>B: WS backend_query(如需更多状态)
    B-->>W: WS backend_query_result
    W-->>B: WS stream_chunk x N
    B-->>H: WS 转发 stream_chunk
    H-->>U: 实时显示 AI 回复
    W->>B: WS message_response isComplete=true
    W->>B: WS checkpoint_put
    B->>B: 写入数据库并返回 ACK
    B->>H: WS run_status COMPLETED
    end

6.2 后续对话(Worker 已就绪,WS 连接已存在)

sequenceDiagram
    participant U as 用户
    participant H as Helper
    participant B as Backend
    participant W as Worker Pod

    U->>H: 继续对话
    H->>B: POST /api/v1/runs(同一 session_id)
    B->>B: 检测到 Worker IDLE 且 WS 连接存在
    B->>W: WS push_message(新消息)
    W->>W: agentcore 处理
    W->>B: WS backend_query(按需)
    B-->>W: WS backend_query_result
    W-->>B: WS stream_chunk x N
    B-->>H: WS 转发流式片段
    H-->>U: 实时显示
    W->>B: WS message_response
    Note over U,W: 无需重新创建资源, 延迟极低

6.3 agentcore 向用户提问(ask_user)

sequenceDiagram
    participant U as 用户
    participant H as Helper
    participant B as Backend
    participant W as Worker Pod

    W->>W: 执行工具调用前需要用户确认
    W->>B: WS ask_user
    B->>B: 标记 Worker 状态 WAITING_USER
    B->>H: WS ask_user(转发给 Helper)
    H->>U: 弹出确认对话框

    alt 用户同意
        U->>H: 点击确认
        H->>B: POST 用户回复
        B->>W: WS user_reply(approved=true)
        B->>B: 标记 Worker 状态 BUSY
        W->>W: 继续执行被中断的任务
    else 用户拒绝
        U->>H: 点击取消
        H->>B: POST 用户拒绝
        B->>W: WS user_reply(approved=false)
        W->>W: 跳过该工具调用, 继续推理
    end

7. Worker 生命周期状态机

7.1 状态定义

stateDiagram-v2
    [*] --> PENDING : Backend 创建 Deployment
    PENDING --> IDLE : Pod 就绪, WS 连接建立
    IDLE --> BUSY : Backend 通过 WS 推送消息
    BUSY --> WAITING_USER : agentcore 发送 ask_user
    WAITING_USER --> BUSY : 用户回复经 WS 推送
    BUSY --> IDLE : 任务完成
    IDLE --> DRAINING : 空闲超过 idle_ttl_seconds
    DRAINING --> [*] : 优雅关闭, WS Close
    PENDING --> FAILED : Pod 启动失败
    BUSY --> FAILED : agentcore 异常退出
    IDLE --> FAILED : WS 连接断开且重连失败

7.2 状态说明

状态含义WebSocket 状态Backend 行为
PENDINGDeployment 已创建,Pod 正在启动未连接等待 WS 连接
IDLEWorker 就绪,等待消息已连接可以推送消息
BUSY正在处理用户请求已连接,收发中接收流式事件
WAITING_USERagentcore 在等待用户回复已连接,等待中转发 ask_user,等待用户操作
DRAINING空闲超时,正在优雅关闭发送 Close 帧不再推送消息
FAILED启动或执行失败断开或未建立上报错误

7.3 空闲策略(WorkerIdlePolicy)

参数类型说明
idle_ttl_seconds必填Worker 空闲多久后自动关闭(秒)
max_lifetime_seconds可选Worker 最长存活时间,无论是否空闲

7.4 优雅关闭流程

当 K8s 发送 TERM 信号时,Worker 执行以下步骤:

1. 收到 SIGTERM 信号
2. 通过 WS 发送 state_change(DRAINING)
3. 写入时间戳到 /workspace/session/worker-draining.log
4. 删除 /tmp/agent-store-worker.ready 标记文件
5. readinessProbe 失败, K8s 停止路由新请求
6. 等待当前任务完成(terminationGracePeriodSeconds: 30 秒)
7. 发送 WebSocket Close 帧(1000 Normal Closure)
8. 进程退出

8. 安全机制

8.1 容器安全上下文

Worker Pod 的安全配置(已在代码中实现):

securityContext:
  runAsNonRoot: true           # 禁止 root 运行
  runAsUser: 1000              # 以 UID 1000 运行
  runAsGroup: 1000             # 以 GID 1000 运行
  fsGroup: 1000                # 文件系统组
  seccompProfile:
    type: RuntimeDefault       # 使用容器运行时默认 seccomp 配置
  allowPrivilegeEscalation: false  # 禁止提权
  capabilities:
    drop: ["ALL"]              # 丢弃所有 Linux capabilities
  readOnlyRootFilesystem: false    # 允许写入(agentcore 需要)

8.2 WebSocket 连接安全

安全措施说明
连接认证WS 握手时在 URL 中携带 run_id,Backend 验证该 run 存在且状态为 SUBMITTED
连接唯一性每个 run_id 只允许一条 WS 连接,重复连接将被拒绝
TLS(可选)本地部署时为 ws://(明文),生产环境应升级为 wss://(加密)
消息校验Backend 验证每条 WS 消息的 type 字段是否为已知类型

8.3 凭据传递

API 密钥和敏感凭据通过 K8s Secret 注入,出现在 ConfigMap 或环境变量明文中,也通过 WebSocket 传输:

envFrom:
  - secretRef:
      name: agent-provider-secret-RUN_ID
      optional: false

9. 本地工具桥接(规划中)

未来 Helper 计划提供“工具桥“(Tool Bridge)功能,让容器内的 agentcore 能间接调用用户本地的工具。WebSocket 的双向特性使得工具桥实现更加自然:

graph LR
    A["agentcore"] -->|"WS tool_call"| B["Backend"]
    B -->|"WS tool_request"| H["Helper"]
    H -->|"执行本地命令"| M["本地工具"]
    M -->|"返回结果"| H
    H -->|"WS tool_result"| B
    B -->|"WS user_reply"| A

本地部署优势:由于 Helper、Backend、K3s 都在同一台机器上,整条 WebSocket 链路全部走 localhost,延迟小于 1ms。


10. 技术实现要点

10.1 Backend 端(Rust / axum)

axum 的 WebSocket 支持只需在 Cargo.toml 中启用 ws feature:

# 当前
axum = { version = "0.8.6", features = ["multipart", "macros"] }

# 升级后
axum = { version = "0.8.6", features = ["multipart", "macros", "ws"] }

Backend 需要新增的路由:

#![allow(unused)]
fn main() {
// WebSocket 路由
.route("/ws/workers/:run_id", get(ws_worker_handler))
.route("/ws/sessions/:session_id", get(ws_session_handler))
}

10.2 Worker 端(agentcore)

agentcore 启动后根据 AGENT_WORKER_EVENT_CHANNEL_KIND 环境变量选择通信模式:

if AGENT_WORKER_EVENT_CHANNEL_KIND == "websocket":
    url = AGENT_WORKER_WS_URL
    连接 WebSocket, 双向通信
elif AGENT_WORKER_EVENT_CHANNEL_KIND == "http-callback":
    url = AGENT_WORKER_HTTP_CALLBACK_URL
    启动 HTTP 服务 :8081 + HTTP POST 回调

10.3 K8s Service 变化

WebSocket 模式下,Worker 不再需要 K8s Service 进行数据通信(因为是 Worker 主动连接 Backend),但 K8s Service 仍保留用于:

  • Backend 对 Worker 的 HTTP 健康检查(可选,因为 WS 连接状态本身就是健康指标)
  • 未来可能的直接 HTTP 调用(如管理接口)

下一步

了解了通信机制后,请继续阅读: