输入输出流程
本章说明每次运行(Run)到底传了什么给容器、容器又产出了什么。如果你想了解“数据是怎么进去的、结果是怎么出来的“,这里是最完整的答案。
重要背景:系统的核心数据(对话历史、会话状态、运行记录、checkpoint 元数据等)统一由 Backend 管理并查询数据库。传递给容器的文件(如
run-context.json)只是启动时的上下文快照;容器后续如需更多数据或要持久化状态,均通过 Worker ↔ Backend 协议完成,而不是直接查库。
1. 数据流全景
graph LR
subgraph CONTROL["控制面"]
DB["Backend 数据库"]
BACKEND["Backend 组装"]
LP[LaunchPlan]
end
subgraph K3S_ORCH["K3s 编排"]
CM[ConfigMap<br/>Agent 包文件]
SEC[Secret<br/>API 密钥]
DEP["Deployment + Service"]
end
subgraph INSIDE["Worker 容器"]
IN["/workspace/in/<br/>run-context.json + 附件"]
EXEC["agentcore"]
CB["WebSocket → Backend"]
SESS["协议化状态访问<br/>checkpoint / query / command"]
end
DB --> BACKEND --> LP
LP --> CM
LP --> SEC
LP --> DEP
DEP --> IN
CM -->|挂载为目录| IN
SEC -->|注入为环境变量| EXEC
IN --> EXEC
EXEC --> CB
EXEC --> SESS
2. 输入:进入容器的内容
2.1 run-context.json(运行上下文投递文件)
这是 agentcore 的初始输入,由 Backend 在每次 Run 前从数据库中提取数据生成:
| 字段 | 数据来源 | 说明 |
|---|---|---|
sessionId | 数据库 SessionRecord | 所属会话的唯一标识 |
latestRunId | 数据库 SessionRecord | 会话中最近一次 Run 的 ID |
history | 数据库 SessionRecord.history | 截止到本轮为止的完整对话历史 |
sessionRuntimeState | 数据库 SessionRecord.runtime_state | 会话运行时状态(含 checkpoint 信息) |
package | 数据库 PackageCatalog | Agent 包声明:名称、描述、工具列表等 |
builtInTools | 数据库 LoadedPackage | Agent 包内置工具定义 |
envFacts | 数据库 LoadedPackage | 环境事实(如运行时版本信息) |
inputArtifacts | 本次请求 | 用户上传的附件文件名列表 |
systemPrompt | 数据库 PackageCatalog | Agent 包开发者定义的系统提示词 |
userPrompt | 本次请求 | 用户本轮发送的文本消息 |
数据流向:数据库 → Backend write_run_context() → JSON 文件 → 容器 /workspace/in/run-context.json
为什么用文件传递初始上下文? 容器启动前 Backend 无法直接与容器通信。
run-context.json用于将完整的会话历史和上下文“搬运“进容器,使 agentcore 启动后立刻拥有完整的上下文。后续的用户消息通过 HTTP 推送到容器的 8081 端口。
2.2 用户上传的附件
用户通过 Helper 上传的文件会被 Backend 复制到工作区的 in/ 目录:
/workspace/in/
run-context.json ← 从数据库生成的初始上下文
report.pdf ← 用户上传
data.csv ← 用户上传
2.3 Agent 包文件
开发者在 Agent 包中定义的所有文件(提示词模板、工具脚本、配置文件等),通过 ConfigMap 挂载到容器内的 /workspace/pkg/ 目录:
/workspace/pkg/
system-prompt.md ← 系统提示词
tools.json ← 工具声明
helper-script.py ← 辅助脚本
2.4 API 密钥与凭据
大模型 API 密钥等敏感信息通过 Secret 以环境变量形式注入,不会写入文件系统:
OPENAI_API_KEY=sk-xxx
ANTHROPIC_API_KEY=sk-ant-xxx
凭据来源优先级:本次请求参数 > RuntimeProfile 模板 > 进程环境变量。
2.5 后续用户消息(协议推送,异步)
Worker 就绪后,后续用户消息不再写入文件,而是由 Backend 通过协议直接推送给 Worker。
{
"type": "push_message",
"id": "msg-001",
"timestamp": "2026-04-17T02:30:00Z",
"payload": {
"runId": "019xxx",
"sessionId": "019xxx",
"content": "请帮我分析这个数据集",
"artifacts": ["data.csv"]
}
}
语义:这是异步操作。Backend 负责把消息送达 Worker,但真正的推理和结果返回稍后发生。
2.6 会话状态(多轮场景)
如果不是第一轮运行,Worker 会从两个地方获得连续性信息:
run-context.json初始快照:由 Backend 查库后生成- 协议化状态请求:Worker 运行时按需向 Backend 请求更完整的 checkpoint 或业务数据
这意味着容器不再把“数据库文件”当作主要状态来源。权威状态仍在 Backend 管理的数据库中。
2.7 运行时数据请求(同步)
当 Worker 在执行过程中需要更多数据时,会通过协议向 Backend 发起同步请求:
sequenceDiagram
participant W as Worker
participant B as Backend
participant DB as 数据库
W->>B: backend_query / checkpoint_get
B->>DB: 查询
DB-->>B: 返回结果
B-->>W: backend_query_result / checkpoint_get_result
语义:这是同步操作。因为 Worker 必须等到结果,才能继续后续推理或恢复。
3. 输出:结果如何返回
agentcore 通过 WebSocket(网络套接字)双向通道 将结果实时推送给 Backend;需要持久化的 checkpoint 也通过协议发送给 Backend,由 Backend 统一写入数据库。
3.1 WebSocket 实时输出
agentcore 启动后主动连接 Backend 的 WebSocket 端点 ws://backend/ws/workers/{run_id},所有输出事件通过同一条持久连接推送:
sequenceDiagram
participant A as agentcore
participant B as Backend
participant DB as 数据库
participant H as Helper
rect rgb(230, 255, 230)
Note over A,H: 流式输出
loop 推理过程中
A->>B: WS stream_chunk
B->>H: WS 转发 token 片段
H->>H: 实时渲染
end
end
rect rgb(230, 245, 255)
Note over A,H: 最终结果
A->>B: WS message_response
Note over A,B: 携带完整回复内容 + 渲染格式
B->>DB: 追加到 session.history
B->>H: WS 推送完成通知
end
rect rgb(255, 245, 230)
Note over A,B: 持久化
A->>B: checkpoint_put
B->>DB: 写入 checkpoint 数据库
B-->>A: checkpoint_put_ack
end
3.2 事件类型
| 事件类型 | 说明 | 典型内容 |
|---|---|---|
message_response | agentcore 的最终回答 | 完整文本 + 渲染格式(Markdown/代码/表格等) |
stream_chunk | 流式推理片段 | token 级文本片段 |
ask_user | 向用户提问 | 问题文本 + 可选选项列表 |
tool_call | 工具调用通知 | 工具名、参数、执行结果 |
state_change | 状态变更 | Worker 生命周期状态(BUSY/IDLE/WAITING_USER) |
error | 运行出错 | 错误类型 + 错误信息 |
3.3 消息格式
{
"runId": "019xxx",
"sessionId": "019xxx",
"workerName": "agent-worker-abc123",
"eventType": "message_response",
"payload": {
"content": "根据数据分析,销售额环比增长了 15%...",
"format": "markdown",
"isComplete": true
},
"timestamp": "2025-01-15T10:30:00Z"
}
与文件输出的区别:agentcore 的响应直接通过 WebSocket 发送给 Backend,不再写入
final-answer.md等文件。这使得:
- 支持多种渲染格式(Markdown、代码块、表格、图表等),前端可以根据
format字段选择渲染方式- 支持流式推送,用户无需等待完整结果(WebSocket 帧开销仅 2-10 字节,比 HTTP 请求头 300+ 字节大幅降低)
- 支持中间交互,agentcore 可以随时发送
ask_user事件
3.4 Checkpoint 持久化(同步写入 Backend)
Worker 不直接把 checkpoint 写到数据库文件,而是通过协议把 checkpoint 快照发送给 Backend:
{
"type": "checkpoint_put",
"id": "ckp-001",
"timestamp": "2026-04-17T02:30:05Z",
"payload": {
"sessionId": "019xxx",
"runId": "019xxx",
"snapshot": {
"historyCursor": 12,
"toolState": {},
"memoryState": {}
}
}
}
随后 Backend:
- 校验该 checkpoint 是否属于当前 Session / Run
- 写入 Backend 管理的数据库
- 返回
checkpoint_put_ack - 更新
SessionRuntimeState
语义:这是同步操作。Worker 只有在收到 ACK 后,才能认为这份状态已经可恢复。
4. 完整输入输出汇总
graph TB
subgraph INPUT["输入(进入 Worker)"]
RC["run-context.json<br/>(从数据库生成)"]
ATTACH["用户附件<br/>(上传或挂载)"]
PKG["Agent 包文件<br/>(ConfigMap 挂载)"]
KEYS["API 密钥<br/>(Secret 环境变量)"]
MSG["后续消息<br/>(WebSocket 推送)"]
end
subgraph OUTPUT["输出(从 Worker 返回)"]
CB_OUT["WebSocket 事件<br/>(结果 + 流式 + 交互)"]
CKP["Checkpoint 协议写回<br/>(Backend 持久化)"]
LOGS["日志/本地缓存<br/>(可选本地文件)"]
end
RC --> EXEC[agentcore]
ATTACH --> EXEC
PKG --> EXEC
KEYS --> EXEC
MSG --> EXEC
EXEC --> CB_OUT
EXEC --> CKP
EXEC --> LOGS
5. LaunchPlan 的完整结构
LaunchPlan(启动计划)是 Backend 传递给 K3s 编排器的核心数据结构:
LaunchPlan
├── identity
│ ├── run_id: UUID
│ ├── session_id: UUID
│ ├── user_id: String
│ └── package_name: String
├── runtime
│ ├── image: String // 容器镜像地址
│ ├── install_strategy: enum // 运行时安装策略
│ ├── execution_mode: enum // 执行模式
│ ├── requests/limits // CPU、内存、磁盘资源
│ └── family: RuntimeFamily // 运行时系列(Codex/ClaudeCode)
├── package_spec
│ ├── files: Map<String,String> // Agent 包文件映射
│ └── verify_args: Vec<String> // 验证命令参数
├── paths
│ ├── workspace_host_path // 本机工作区根目录
│ ├── session_state_host_path // 本机会话状态目录
│ ├── checkpoint_mount_path // 容器内 checkpoint 路径
│ └── mount_grant: Option // 用户授权挂载
├── strategy
│ ├── volume_backend: HostPath // 卷后端(本地部署使用 HostPath)
│ ├── runtime_image_mode // 镜像获取模式
│ └── checkpoint_store_mode // Checkpoint 存储模式
├── interaction
│ ├── worker_event_channel // 事件通道(WebSocket / HTTP 回调 / 队列消费)
│ └── worker_idle_policy // 空闲回收策略
├── services
│ ├── provider_env: Map // 大模型 API 密钥
│ └── external_service_bindings // 外部服务绑定(数据库等)
└── cluster
├── namespace: String // K8s 命名空间
├── service_account: String // 服务账号
└── apply_to_cluster: bool // 是否真正提交
6. 渲染后的 K8s 资源示例
一次真实运行会产生以下 K8s 资源:
ConfigMap(存储 Agent 包文件)
apiVersion: v1
kind: ConfigMap
metadata:
name: agent-pkg-abc123
data:
system-prompt.md: |
你是一个数据分析助手...
tools.json: |
[{"name": "python_exec", ...}]
Secret(存储凭据)
apiVersion: v1
kind: Secret
metadata:
name: agent-env-abc123
stringData:
OPENAI_API_KEY: sk-xxx
Deployment + Service
apiVersion: apps/v1
kind: Deployment
metadata:
name: agent-worker-abc123
ownerReferences: [] # 级联删除:删 Deployment 自动清理所有资源
spec:
replicas: 1
template:
spec:
terminationGracePeriodSeconds: 30
containers:
- name: worker
image: registry.example.com/agentcore:latest
ports:
- containerPort: 8081
name: control
protocol: TCP
readinessProbe:
exec:
command: ["bash", "-lc", "test -f /tmp/agent-store-worker.ready"]
initialDelaySeconds: 2
periodSeconds: 5
failureThreshold: 6
livenessProbe:
exec:
command: ["bash", "-lc", "test -f /tmp/agent-store-worker.ready"]
initialDelaySeconds: 10
periodSeconds: 15
failureThreshold: 3
env:
- name: AGENT_WORKLOAD_KIND
value: session-worker
- name: AGENT_WORKER_EVENT_CHANNEL_KIND
value: websocket
- name: AGENT_WORKER_WS_URL
value: ws://host.k3s.internal:PORT/ws/workers/RUN_ID
envFrom:
- secretRef:
name: agent-env-abc123
volumeMounts:
- name: workspace
mountPath: /workspace
- name: pkg
mountPath: /workspace/pkg
securityContext:
runAsNonRoot: true
allowPrivilegeEscalation: false
capabilities:
drop: ["ALL"]
---
apiVersion: v1
kind: Service
metadata:
name: agent-worker-svc-abc123
spec:
type: ClusterIP
ports:
- port: 8081
targetPort: control
protocol: TCP
selector:
app: agent-worker-abc123
下一步
了解了输入输出后,请继续阅读 文件挂载机制 了解容器如何访问本机文件。