AgentCore 通信机制
本章详细说明容器内的 agentcore(智能体核心进程)如何与 Backend 以及最终用户进行双向实时通信。这是理解“用户发消息 → agentcore 处理 → 实时看到回复 → agentcore 还能反过来向用户提问“这条完整链路的关键。
设计原则:系统采用 SessionWorker(会话工作进程)模式——容器以 Deployment(部署)方式长驻运行,通过 WebSocket(网络套接字) 双向通道与 Backend 实时通信。所有数据库访问都收敛到 Backend;容器内的 Worker 只负责协议通信,不直接连接数据库。
1. 通信架构总览
1.1 三层通信模型
graph TB
subgraph LOCAL["用户本地机器(同一台设备)"]
U["用户"]
H["Helper(前端)"]
B["Backend(后端服务)"]
subgraph K3S["K3s 集群"]
W["SessionWorker Pod<br/>agentcore 进程"]
end
end
U -->|"操作界面"| H
H -->|"WebSocket<br/>双向实时通道"| B
B <-->|"WebSocket<br/>双向实时通道"| W
B -->|"推送更新"| H
H -->|"渲染显示"| U
style LOCAL fill:#f5f5f5,stroke:#333
style K3S fill:#e8f4fd,stroke:#0969da
1.2 通信协议一览
| 链路 | 协议 | 方向 | 说明 |
|---|---|---|---|
| Backend ↔ Worker | WebSocket(推荐) | 双向实时 | Worker 启动后主动连接 Backend 的 WS 端点,所有消息和事件通过同一条持久连接传输 |
| Helper ↔ Backend | WebSocket | 双向实时 | Helper 连接 Backend 的 WS 端点,实时接收流式输出和状态更新 |
| Helper → Backend | HTTP REST | 请求-响应 | 创建会话、创建运行、上传文件等操作仍使用 REST API |
| Backend ↔ Worker(降级) | HTTP POST | 单向推送 | 当 WebSocket 不可用时降级为 HTTP 回调模式 |
1.3 协议职责边界
| 组件 | 负责什么 | 不负责什么 |
|---|---|---|
| Worker / agentcore | 推理、工具调用、文件读写、协议消息收发 | 不直接查数据库,不直接持有数据库凭据 |
| Backend | 会话管理、数据库查询、状态持久化、权限与审计 | 不替 Worker 执行推理 |
| Helper | 展示 UI、收集用户输入、展示流式结果 | 不直接查数据库,不直接和 Pod 建立协议通道 |
因此 Backend ↔ Worker 的协议除了“消息推送”和“流式输出”,还必须承担:
- Worker 请求 Backend 查询数据
- Worker 请求 Backend 写入状态
- Worker 请求读取 checkpoint
- Worker 上报新的 checkpoint
1.4 为什么选择 WebSocket 而非 HTTP
| 对比维度 | HTTP POST 回调(旧方案) | WebSocket(推荐方案) |
|---|---|---|
| 流式输出开销 | 每个 token 片段 = 1 次 HTTP 请求(约 300 字节头 + JSON 体),500 token = 500 次请求 | 每个 token 片段 = 1 个 WS 帧(2-10 字节帧头 + JSON 体),开销降低 95% |
| 连接管理 | 每次请求需要 TCP 连接(即使用 keep-alive 也有头部开销) | 单个持久 TCP 连接,零连接建立开销 |
| 消息顺序 | HTTP 请求可能乱序到达(多连接并发时) | TCP 保证帧顺序,无需额外序列号 |
| 背压控制 | 无内建机制,Worker 可能 POST 过快压垮 Backend | TCP 流控天然提供背压(backpressure),自动限速 |
| 双向通信 | 需要两套独立的 HTTP 通道(Backend→Worker + Worker→Backend) | 同一条连接双向传输,架构更简单 |
| 连接状态感知 | 需要轮询或心跳检测 Worker 是否存活 | 连接断开时 Backend 立即感知 |
| 延迟 | 本地部署约 1-2ms / 请求(TCP 握手 + HTTP 解析) | 本地部署约 0.1ms / 帧(仅帧解析) |
1.5 协议语义总规则
| 类别 | 语义 | 代表消息 |
|---|---|---|
| 同步请求-响应 | 必须等待结果再继续 | backend_query、backend_command、checkpoint_get、checkpoint_put |
| 异步事件流 | 发出后继续执行 | stream_chunk、state_change、tool_progress |
| 混合语义 | 提交异步,但业务流程暂停等待 | ask_user / user_reply |
2. SessionWorker 容器内部架构
2.1 容器结构
SessionWorker 以 Deployment(部署)方式运行,每个 Pod 内包含:
+---------------------------------------------+
| SessionWorker Pod |
| |
| +----------------------------------------+ |
| | 启动脚本(bash) | |
| | 1. 创建工作目录 | |
| | 2. 写入运行时元数据文件 | |
| | 3. 创建 ready 标记文件 | |
| | 4. 注册 TERM 信号处理(优雅关闭) | |
| | 5. 启动心跳循环(每 30 秒) | |
| +----------------------------------------+ |
| |
| +----------------------------------------+ |
| | agentcore 进程 | |
| | * 建立 WS 连接到 Backend | |
| | * 监听 :8081(HTTP 控制端口,探针用) | |
| | * 通过 WS 接收用户消息 | |
| | * 调用大模型 API 进行推理 | |
| | * 执行工具调用 | |
| | * 通过 WS 推送结果给 Backend | |
| +----------------------------------------+ |
| |
| 探针检查(保留 HTTP): |
| * readinessProbe: test -f /tmp/agent-store-worker.ready |
| * livenessProbe: test -f /tmp/agent-store-worker.ready |
+---------------------------------------------+
注意:K8s 健康探针(readinessProbe / livenessProbe)仍使用文件检查方式,这是 K8s 原生支持的最简单方案。WebSocket 仅用于数据通信,不替代探针。
2.2 容器环境变量
Backend 通过环境变量向容器注入所有配置信息:
| 环境变量 | 含义 | 示例值 |
|---|---|---|
RUN_ID | 本次运行 ID | 550e8400-e29b-41d4-a716-446655440000 |
SESSION_ID | 会话 ID | 660e8400-e29b-41d4-a716-446655440001 |
USER_ID | 用户 ID | user-001 |
AGENT_PACKAGE_NAME | 智能体包名 | code-assistant |
AGENT_PACKAGE_ROOT | 包文件挂载路径 | /opt/agent/package |
AGENT_SESSION_STATE_ROOT | 会话本地目录(缓存/日志) | /workspace/session |
AGENT_CHECKPOINT_ROOT | 本地缓存或临时状态目录(可选) | /workspace/session/checkpoints |
AGENT_WORKLOAD_KIND | 工作负载类型 | session-worker |
AGENT_RUNTIME_IMAGE_MODE | 镜像模式 | prewarmed-base |
AGENT_CHECKPOINT_STORE_MODE | 检查点持久化策略 | backend-managed |
AGENT_WORKER_SERVICE_NAME | Worker 的 K8s Service 名称 | agent-worker-svc-xxxx |
AGENT_WORKER_EVENT_CHANNEL_KIND | 事件通道类型 | websocket(推荐) / http-callback(降级) |
AGENT_WORKER_WS_URL | WebSocket 连接地址(WS 模式) | ws://host.k3s.internal:3000/ws/workers/RUN_ID |
AGENT_WORKER_HTTP_CALLBACK_URL | 回调 URL(HTTP 降级模式) | http://host.k3s.internal:3000/api/v1/worker-events |
AGENT_WORKER_IDLE_TTL_SECONDS | 空闲超时时间(秒) | 300 |
AGENT_WORKER_MAX_LIFETIME_SECONDS | 最大存活时间(秒,可选) | 3600 |
3. WebSocket 通信协议详解(推荐方案)
3.1 连接建立
Worker 启动后,agentcore 主动发起 WebSocket 连接到 Backend:
sequenceDiagram
participant W as Worker Pod
participant B as Backend
Note over W: agentcore 启动完成
W->>B: WebSocket 握手 ws://backend:3000/ws/workers/RUN_ID
B->>B: 验证 run_id, 注册 WS 连接
B-->>W: 101 Switching Protocols
Note over W,B: WebSocket 连接建立, 双向通道就绪
W->>B: WS state_change IDLE
Note over B: 标记 Worker 为 IDLE, 可以推送消息
WebSocket 端点设计:
| Backend 端点 | 用途 | 连接方 |
|---|---|---|
ws://backend:PORT/ws/workers/RUN_ID | Backend ↔ Worker 数据通道 | Worker 主动连接 |
ws://backend:PORT/ws/sessions/SESSION_ID | Backend → Helper 实时推送 | Helper 主动连接 |
连接方向说明: Worker 主动连接到 Backend(而非 Backend 连接到 Worker),这样设计的好处是:
- Worker 已通过环境变量知道 Backend 地址,无需额外服务发现
- Backend 无需等待 K8s Service 就绪再连接
- 连接生命周期与 Worker 生命周期天然绑定
3.2 消息帧格式
所有 WebSocket 消息均为 JSON 文本帧,统一格式:
{
"type": "消息类型",
"id": "消息唯一 ID(UUID)",
"timestamp": "2024-01-15T10:30:00Z",
"payload": {}
}
3.3 Backend → Worker 消息类型
| 消息类型 | 用途 | 说明 |
|---|---|---|
push_message | 推送用户消息给 agentcore | 包含用户输入、上下文、附件 |
user_reply | 转发用户对 ask_user 的回复 | 包含回复内容和原始 questionId |
cancel_task | 取消当前正在执行的任务 | agentcore 应尽快停止当前推理 |
backend_query_result | 返回 Worker 请求的数据查询结果 | 对应同步查询 |
backend_command_result | 返回 Worker 请求的写入结果 | 对应同步写入 |
checkpoint_get_result | 返回恢复所需的 checkpoint 数据 | 对应同步恢复 |
checkpoint_put_ack | 确认 checkpoint 已被 Backend 持久化 | 对应同步保存 |
ping | 心跳检查 | Worker 应回复 pong |
push_message 示例:
{
"type": "push_message",
"id": "msg-uuid-001",
"timestamp": "2024-01-15T10:30:00Z",
"payload": {
"sessionId": "session-uuid",
"runId": "run-uuid",
"content": "请帮我分析这段代码",
"artifacts": [
{ "name": "main.py", "relativePath": "src/main.py" }
],
"context": {
"history": [],
"sessionRuntimeState": {}
}
}
}
user_reply 示例:
{
"type": "user_reply",
"id": "reply-uuid-001",
"timestamp": "2024-01-15T10:30:05Z",
"payload": {
"questionId": "q-uuid-001",
"answer": "确认删除",
"approved": true
}
}
3.4 Worker → Backend 消息类型
| 消息类型 | 用途 | 频率 |
|---|---|---|
stream_chunk | 流式输出的 token 片段 | 高频:每 token 一条 |
message_response | 最终回答(完整或部分) | 每轮对话 1 条 |
ask_user | agentcore 向用户提问 | 按需 |
state_change | 生命周期状态变更 | 状态切换时 |
tool_call | 工具调用通知(需要用户授权时) | 按需 |
backend_query | 请求 Backend 查询数据库 | 按需,同步语义 |
backend_command | 请求 Backend 执行数据库写入 | 按需,同步语义 |
checkpoint_get | 请求恢复 checkpoint | 启动或恢复时,同步语义 |
checkpoint_put | 上报最新 checkpoint | 关键步骤后,同步语义 |
error | 执行错误报告 | 异常时 |
pong | 心跳回复 | 响应 ping |
stream_chunk 示例(最高频消息):
{
"type": "stream_chunk",
"id": "chunk-042",
"timestamp": "2024-01-15T10:30:01.123Z",
"payload": {
"chunkIndex": 42,
"content": "这段代码",
"renderFormat": "markdown",
"isComplete": false
}
}
对比 HTTP 回调:同样的
stream_chunk在 HTTP 模式下需要完整的 HTTP POST 请求(约 300 字节 HTTP 头 + 约 100 字节 JSON 体 = 约 400 字节),而 WebSocket 帧仅需约 2 字节帧头 + 约 100 字节 JSON 体 = 约 102 字节。单条消息开销降低约 75%。
backend_query 示例(同步请求):
{
"type": "backend_query",
"id": "query-001",
"timestamp": "2026-04-17T02:30:00Z",
"payload": {
"resource": "session_memory",
"operation": "get",
"arguments": {
"sessionId": "session-uuid"
}
}
}
backend_query_result 示例:
{
"type": "backend_query_result",
"id": "query-001",
"timestamp": "2026-04-17T02:30:00Z",
"payload": {
"ok": true,
"data": {
"summary": "最近 3 轮都在讨论数据库迁移"
}
}
}
checkpoint_put 示例:
{
"type": "checkpoint_put",
"id": "ckp-001",
"timestamp": "2026-04-17T02:30:02Z",
"payload": {
"sessionId": "session-uuid",
"runId": "run-uuid",
"snapshot": {
"historyCursor": 12,
"memoryState": {},
"toolState": {}
}
}
}
message_response 示例:
{
"type": "message_response",
"id": "resp-uuid-001",
"timestamp": "2024-01-15T10:30:02Z",
"payload": {
"messageId": "resp-uuid-001",
"content": "这段代码的主要问题是...",
"renderFormat": "markdown",
"isComplete": true,
"usage": {
"promptTokens": 1500,
"completionTokens": 800,
"totalTokens": 2300
}
}
}
ask_user 示例:
{
"type": "ask_user",
"id": "ask-uuid-001",
"timestamp": "2024-01-15T10:30:01.500Z",
"payload": {
"questionId": "q-uuid-001",
"question": "我需要删除这个文件,是否确认?",
"questionType": "confirmation",
"options": ["确认删除", "取消"],
"timeout_seconds": 300
}
}
state_change 示例:
{
"type": "state_change",
"id": "sc-uuid-001",
"timestamp": "2024-01-15T10:30:00Z",
"payload": {
"previousState": "BUSY",
"newState": "IDLE",
"reason": "task completed"
}
}
error 示例:
{
"type": "error",
"id": "err-uuid-001",
"timestamp": "2024-01-15T10:30:03Z",
"payload": {
"errorCode": "LLM_API_TIMEOUT",
"message": "调用大模型 API 超时",
"recoverable": true,
"details": {}
}
}
3.5 WebSocket 连接管理
心跳机制
| 方向 | 间隔 | 超时 | 动作 |
|---|---|---|---|
| Backend → Worker | 每 15 秒发送 ping | 30 秒无 pong | Backend 判定 Worker 异常,标记 FAILED |
| Worker → Backend | 收到 ping 立即回复 pong | — | — |
WebSocket 协议本身支持 Ping/Pong 控制帧(RFC 6455),可以直接使用协议层心跳,无需应用层额外实现。
断线重连
当 WebSocket 连接意外断开时:
graph TD
A["WS 连接断开"] --> B{"Worker 是否仍在运行?"}
B -->|"是"| C["Worker 尝试重连<br/>指数退避: 1s, 2s, 4s, 8s..."]
B -->|"否 进程退出"| D["Backend 标记 Worker FAILED"]
C --> E{"重连成功?"}
E -->|"是"| F["恢复通信<br/>Worker 发送 state_change"]
E -->|"否, 超过 30 秒"| G["Worker 自行退出"]
G --> D
重连策略:
- 初始等待 1 秒,指数退避最大 8 秒
- 最大重试时间 30 秒(与 terminationGracePeriodSeconds 一致)
- 重连成功后 Worker 发送
state_change通知 Backend 当前状态
3.6 数据访问为什么走协议而不是直连数据库
sequenceDiagram
participant W as Worker
participant B as Backend
participant DB as 数据库
W->>B: backend_query
B->>DB: 查库
DB-->>B: 返回结果
B-->>W: backend_query_result
这样做的直接收益是:
- 数据库账号、密码、DSN 只保留在 Backend
- Worker 只面对统一协议,不区分 PostgreSQL / MySQL / Redis / 向量库
- Backend 可以在查库前做权限控制、审计、缓存和限流
优雅关闭
Worker 收到 SIGTERM
-> 通过 WS 发送 state_change(DRAINING)
-> Backend 停止推送新消息
-> 等待当前任务完成
-> 发送 WebSocket Close 帧(状态码 1000: Normal Closure)
-> 关闭连接
-> 进程退出
3.7 事件通道机制(WorkerEventChannel)
代码中的 WorkerEventChannel(工作进程事件通道)枚举定义了通信模式。推荐升级后支持三种:
| 通道类型 | 环境变量 | 工作方式 | 状态 | 适用场景 |
|---|---|---|---|---|
| WebSocket | AGENT_WORKER_WS_URL | Worker 主动连接 Backend WS 端点,双向通信 | 推荐 | 所有场景,特别是流式输出 |
| HttpCallback | AGENT_WORKER_HTTP_CALLBACK_URL | Worker 主动 POST 到 Backend + Backend POST 到 Worker 8081 端口 | 降级备选 | WebSocket 不可用时 |
| QueueConsumer | AGENT_WORKER_QUEUE_NAME | Worker 从消息队列拉取/推送 | 预留 | 未来高并发分布式场景 |
WorkerEventChannel Rust 类型定义(推荐升级后):
#![allow(unused)]
fn main() {
pub enum WorkerEventChannel {
/// 推荐:Worker 主动连接 Backend 的 WebSocket 端点
WebSocket { backend_ws_url: String },
/// 降级:双向 HTTP POST 回调
HttpCallback { callback_url: String },
/// 预留:消息队列模式
QueueConsumer { queue_name: String },
}
}
4. HTTP 降级模式(当前实现)
当 WebSocket 不可用时(如容器运行时不支持长连接),系统降级为 HTTP POST 回调模式。
4.1 Backend → Worker:HTTP 控制端口
Backend 通过 K8s Service 将 HTTP 请求转发到 Worker Pod 的 8081 端口(控制端口)。
| 端点 | 方法 | 用途 |
|---|---|---|
/messages | POST | 推送用户消息给 agentcore |
/cancel | POST | 取消当前正在执行的任务 |
/status | GET | 查询当前生命周期状态 |
/health | GET | 健康检查 |
4.2 Worker → Backend:HTTP 回调
Worker 通过 AGENT_WORKER_HTTP_CALLBACK_URL 向 Backend 推送事件。每种事件类型(stream_chunk、message_response、ask_user 等)都是一个独立的 HTTP POST 请求。
缺点:每个
stream_chunktoken 片段都需要一个完整的 HTTP 请求,500 token 的回复将产生约 500 次 HTTP POST,这在性能和可靠性上都不理想。
5. Helper ↔ Backend API 详解
Helper 与 Backend 之间通过 WebSocket + HTTP REST 混合模式通信(均在本机 localhost)。
5.1 HTTP REST API(请求-响应操作)
这些 API 的语义也并不完全一样:
- 同步:
GET /sessions/{id}、POST /artifacts/upload、POST /mount-grants - 混合:
POST /runs、POST /runs/{id}/cancel - 异步结果承载:真正的执行结果仍通过 WebSocket 返回
| 操作 | 方法 | 路径 | 说明 |
|---|---|---|---|
| 健康检查 | GET | /health | 返回 200 表示服务正常 |
| 就绪检查 | GET | /ready | 返回 200 表示服务可接受请求 |
| 集群状态 | GET | /api/v1/cluster/status | K3s 集群健康状态 |
| 运行时配置列表 | GET | /api/v1/runtime-profiles | 可用的运行时环境配置 |
| 智能体包列表 | GET | /api/v1/packages | 可用的智能体包 |
| 智能体包详情 | GET | /api/v1/packages/NAME | 指定包的详细信息 |
| 会话列表 | GET | /api/v1/sessions | 所有会话列表 |
| 创建会话 | POST | /api/v1/sessions | 开启新对话会话 |
| 获取会话 | GET | /api/v1/sessions/ID | 会话详情和对话历史 |
| 关闭会话 | POST | /api/v1/sessions/ID/close | 关闭会话 |
| 重开会话 | POST | /api/v1/sessions/ID/reopen | 重新打开已关闭的会话 |
| 运行列表 | GET | /api/v1/runs | 所有运行记录 |
| 创建运行 | POST | /api/v1/runs | 核心:发送消息并启动/复用 Worker |
| 获取运行 | GET | /api/v1/runs/ID | 运行状态和结果 |
| 取消运行 | POST | /api/v1/runs/ID/cancel | 取消正在执行的运行 |
| 运行结果列表 | GET | /api/v1/runs/ID/results | 输出文件列表 |
| 读取结果文件 | POST | /api/v1/runs/ID/results/read | 读取指定输出文件内容 |
| 文件上传 | POST | /api/v1/artifacts/upload | 上传用户文件 |
| 文件列表 | GET | /api/v1/artifacts | 已上传文件列表 |
| 挂载授权列表 | GET | /api/v1/mount-grants | 目录挂载授权列表 |
| 创建挂载授权 | POST | /api/v1/mount-grants | 授权容器访问本地目录 |
| 审计事件 | GET | /api/v1/audit-events | 操作审计日志 |
5.2 WebSocket 实时通道(流式推送)
Helper 连接 ws://localhost:PORT/ws/sessions/SESSION_ID,实时接收 Worker 的流式输出和状态更新:
| Backend 推送的消息类型 | 含义 | 来源 |
|---|---|---|
stream_chunk | AI 输出的 token 片段 | Worker 转发 |
message_response | 最终回答 | Worker 转发 |
ask_user | agentcore 向用户提问 | Worker 转发 |
state_change | Worker 状态变更 | Worker 转发或 Backend 生成 |
run_status | 运行状态更新(SUBMITTED, COMPLETED 等) | Backend 生成 |
5.3 创建运行(核心 API)请求格式
{
"sessionId": "会话 UUID",
"packageName": "智能体包名",
"userPrompt": "用户发送的消息内容",
"workloadKind": "sessionWorker",
"runtimeProfile": "codex-standard",
"runtimeImageMode": "prewarmedBase",
"checkpointStoreMode": "backendManaged",
"workerEventChannel": {
"webSocket": {
"backendWsUrl": "ws://host.k3s.internal:3000/ws/workers"
}
},
"workerIdlePolicy": {
"idleTtlSeconds": 300,
"maxLifetimeSeconds": 3600
},
"providerEnv": {
"ANTHROPIC_API_KEY": "sk-..."
},
"artifactIds": [],
"externalServiceBindings": []
}
6. 完整交互时序
6.1 首次会话(Worker 冷启动 + WebSocket 建连)
sequenceDiagram
participant U as 用户
participant H as Helper
participant B as Backend
participant K as K3s API
participant W as Worker Pod
rect rgb(230, 245, 255)
Note over U,W: 1 创建会话
U->>H: 选择智能体, 开始对话
H->>B: POST /api/v1/sessions
B->>B: 创建 SessionRecord, 初始化状态目录
B-->>H: 返回 session_id
H->>B: 建立 WS 连接 ws://backend/ws/sessions/SESSION_ID
end
rect rgb(255, 245, 230)
Note over U,W: 2 发送首条消息, 启动 Worker
U->>H: 输入消息
H->>B: POST /api/v1/runs
B->>B: 构建 LaunchPlan
B->>K: 创建 ConfigMap + Secret + Deployment
B-->>H: 返回 run_id
H-->>U: 显示启动中
end
rect rgb(230, 255, 230)
Note over W: 3 Worker 启动并建立 WS 连接
K->>W: 调度 Pod
W->>W: 创建工作目录, 写入元数据, touch ready
W->>B: WebSocket 连接 ws://backend/ws/workers/RUN_ID
B-->>W: 101 连接建立
W->>B: WS state_change IDLE
Note over B: Worker 就绪
end
rect rgb(255, 230, 255)
Note over U,W: 4 Backend 通过 WS 推送消息
B->>W: WS push_message(用户消息 + 初始上下文)
W->>W: agentcore 开始推理
W->>B: WS backend_query(如需更多状态)
B-->>W: WS backend_query_result
W-->>B: WS stream_chunk x N
B-->>H: WS 转发 stream_chunk
H-->>U: 实时显示 AI 回复
W->>B: WS message_response isComplete=true
W->>B: WS checkpoint_put
B->>B: 写入数据库并返回 ACK
B->>H: WS run_status COMPLETED
end
6.2 后续对话(Worker 已就绪,WS 连接已存在)
sequenceDiagram
participant U as 用户
participant H as Helper
participant B as Backend
participant W as Worker Pod
U->>H: 继续对话
H->>B: POST /api/v1/runs(同一 session_id)
B->>B: 检测到 Worker IDLE 且 WS 连接存在
B->>W: WS push_message(新消息)
W->>W: agentcore 处理
W->>B: WS backend_query(按需)
B-->>W: WS backend_query_result
W-->>B: WS stream_chunk x N
B-->>H: WS 转发流式片段
H-->>U: 实时显示
W->>B: WS message_response
Note over U,W: 无需重新创建资源, 延迟极低
6.3 agentcore 向用户提问(ask_user)
sequenceDiagram
participant U as 用户
participant H as Helper
participant B as Backend
participant W as Worker Pod
W->>W: 执行工具调用前需要用户确认
W->>B: WS ask_user
B->>B: 标记 Worker 状态 WAITING_USER
B->>H: WS ask_user(转发给 Helper)
H->>U: 弹出确认对话框
alt 用户同意
U->>H: 点击确认
H->>B: POST 用户回复
B->>W: WS user_reply(approved=true)
B->>B: 标记 Worker 状态 BUSY
W->>W: 继续执行被中断的任务
else 用户拒绝
U->>H: 点击取消
H->>B: POST 用户拒绝
B->>W: WS user_reply(approved=false)
W->>W: 跳过该工具调用, 继续推理
end
7. Worker 生命周期状态机
7.1 状态定义
stateDiagram-v2
[*] --> PENDING : Backend 创建 Deployment
PENDING --> IDLE : Pod 就绪, WS 连接建立
IDLE --> BUSY : Backend 通过 WS 推送消息
BUSY --> WAITING_USER : agentcore 发送 ask_user
WAITING_USER --> BUSY : 用户回复经 WS 推送
BUSY --> IDLE : 任务完成
IDLE --> DRAINING : 空闲超过 idle_ttl_seconds
DRAINING --> [*] : 优雅关闭, WS Close
PENDING --> FAILED : Pod 启动失败
BUSY --> FAILED : agentcore 异常退出
IDLE --> FAILED : WS 连接断开且重连失败
7.2 状态说明
| 状态 | 含义 | WebSocket 状态 | Backend 行为 |
|---|---|---|---|
| PENDING | Deployment 已创建,Pod 正在启动 | 未连接 | 等待 WS 连接 |
| IDLE | Worker 就绪,等待消息 | 已连接 | 可以推送消息 |
| BUSY | 正在处理用户请求 | 已连接,收发中 | 接收流式事件 |
| WAITING_USER | agentcore 在等待用户回复 | 已连接,等待中 | 转发 ask_user,等待用户操作 |
| DRAINING | 空闲超时,正在优雅关闭 | 发送 Close 帧 | 不再推送消息 |
| FAILED | 启动或执行失败 | 断开或未建立 | 上报错误 |
7.3 空闲策略(WorkerIdlePolicy)
| 参数 | 类型 | 说明 |
|---|---|---|
idle_ttl_seconds | 必填 | Worker 空闲多久后自动关闭(秒) |
max_lifetime_seconds | 可选 | Worker 最长存活时间,无论是否空闲 |
7.4 优雅关闭流程
当 K8s 发送 TERM 信号时,Worker 执行以下步骤:
1. 收到 SIGTERM 信号
2. 通过 WS 发送 state_change(DRAINING)
3. 写入时间戳到 /workspace/session/worker-draining.log
4. 删除 /tmp/agent-store-worker.ready 标记文件
5. readinessProbe 失败, K8s 停止路由新请求
6. 等待当前任务完成(terminationGracePeriodSeconds: 30 秒)
7. 发送 WebSocket Close 帧(1000 Normal Closure)
8. 进程退出
8. 安全机制
8.1 容器安全上下文
Worker Pod 的安全配置(已在代码中实现):
securityContext:
runAsNonRoot: true # 禁止 root 运行
runAsUser: 1000 # 以 UID 1000 运行
runAsGroup: 1000 # 以 GID 1000 运行
fsGroup: 1000 # 文件系统组
seccompProfile:
type: RuntimeDefault # 使用容器运行时默认 seccomp 配置
allowPrivilegeEscalation: false # 禁止提权
capabilities:
drop: ["ALL"] # 丢弃所有 Linux capabilities
readOnlyRootFilesystem: false # 允许写入(agentcore 需要)
8.2 WebSocket 连接安全
| 安全措施 | 说明 |
|---|---|
| 连接认证 | WS 握手时在 URL 中携带 run_id,Backend 验证该 run 存在且状态为 SUBMITTED |
| 连接唯一性 | 每个 run_id 只允许一条 WS 连接,重复连接将被拒绝 |
| TLS(可选) | 本地部署时为 ws://(明文),生产环境应升级为 wss://(加密) |
| 消息校验 | Backend 验证每条 WS 消息的 type 字段是否为已知类型 |
8.3 凭据传递
API 密钥和敏感凭据通过 K8s Secret 注入,不出现在 ConfigMap 或环境变量明文中,也不通过 WebSocket 传输:
envFrom:
- secretRef:
name: agent-provider-secret-RUN_ID
optional: false
9. 本地工具桥接(规划中)
未来 Helper 计划提供“工具桥“(Tool Bridge)功能,让容器内的 agentcore 能间接调用用户本地的工具。WebSocket 的双向特性使得工具桥实现更加自然:
graph LR
A["agentcore"] -->|"WS tool_call"| B["Backend"]
B -->|"WS tool_request"| H["Helper"]
H -->|"执行本地命令"| M["本地工具"]
M -->|"返回结果"| H
H -->|"WS tool_result"| B
B -->|"WS user_reply"| A
本地部署优势:由于 Helper、Backend、K3s 都在同一台机器上,整条 WebSocket 链路全部走 localhost,延迟小于 1ms。
10. 技术实现要点
10.1 Backend 端(Rust / axum)
axum 的 WebSocket 支持只需在 Cargo.toml 中启用 ws feature:
# 当前
axum = { version = "0.8.6", features = ["multipart", "macros"] }
# 升级后
axum = { version = "0.8.6", features = ["multipart", "macros", "ws"] }
Backend 需要新增的路由:
#![allow(unused)]
fn main() {
// WebSocket 路由
.route("/ws/workers/:run_id", get(ws_worker_handler))
.route("/ws/sessions/:session_id", get(ws_session_handler))
}
10.2 Worker 端(agentcore)
agentcore 启动后根据 AGENT_WORKER_EVENT_CHANNEL_KIND 环境变量选择通信模式:
if AGENT_WORKER_EVENT_CHANNEL_KIND == "websocket":
url = AGENT_WORKER_WS_URL
连接 WebSocket, 双向通信
elif AGENT_WORKER_EVENT_CHANNEL_KIND == "http-callback":
url = AGENT_WORKER_HTTP_CALLBACK_URL
启动 HTTP 服务 :8081 + HTTP POST 回调
10.3 K8s Service 变化
WebSocket 模式下,Worker 不再需要 K8s Service 进行数据通信(因为是 Worker 主动连接 Backend),但 K8s Service 仍保留用于:
- Backend 对 Worker 的 HTTP 健康检查(可选,因为 WS 连接状态本身就是健康指标)
- 未来可能的直接 HTTP 调用(如管理接口)
下一步
了解了通信机制后,请继续阅读: