Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

输入输出流程

本章说明每次运行(Run)到底传了什么给容器、容器又产出了什么。如果你想了解“数据是怎么进去的、结果是怎么出来的“,这里是最完整的答案。

重要背景:系统的核心数据(对话历史、会话状态、运行记录、checkpoint 元数据等)统一由 Backend 管理并查询数据库。传递给容器的文件(如 run-context.json)只是启动时的上下文快照;容器后续如需更多数据或要持久化状态,均通过 Worker ↔ Backend 协议完成,而不是直接查库。


1. 数据流全景

graph LR
    subgraph CONTROL["控制面"]
        DB["Backend 数据库"]
        BACKEND["Backend 组装"]
        LP[LaunchPlan]
    end

    subgraph K3S_ORCH["K3s 编排"]
        CM[ConfigMap<br/>Agent 包文件]
        SEC[Secret<br/>API 密钥]
        DEP["Deployment + Service"]
    end

    subgraph INSIDE["Worker 容器"]
        IN["/workspace/in/<br/>run-context.json + 附件"]
        EXEC["agentcore"]
        CB["WebSocket → Backend"]
        SESS["协议化状态访问<br/>checkpoint / query / command"]
    end

    DB --> BACKEND --> LP
    LP --> CM
    LP --> SEC
    LP --> DEP
    DEP --> IN
    CM -->|挂载为目录| IN
    SEC -->|注入为环境变量| EXEC
    IN --> EXEC
    EXEC --> CB
    EXEC --> SESS

2. 输入:进入容器的内容

2.1 run-context.json(运行上下文投递文件)

这是 agentcore 的初始输入,由 Backend 在每次 Run 前从数据库中提取数据生成

字段数据来源说明
sessionId数据库 SessionRecord所属会话的唯一标识
latestRunId数据库 SessionRecord会话中最近一次 Run 的 ID
history数据库 SessionRecord.history截止到本轮为止的完整对话历史
sessionRuntimeState数据库 SessionRecord.runtime_state会话运行时状态(含 checkpoint 信息)
package数据库 PackageCatalogAgent 包声明:名称、描述、工具列表等
builtInTools数据库 LoadedPackageAgent 包内置工具定义
envFacts数据库 LoadedPackage环境事实(如运行时版本信息)
inputArtifacts本次请求用户上传的附件文件名列表
systemPrompt数据库 PackageCatalogAgent 包开发者定义的系统提示词
userPrompt本次请求用户本轮发送的文本消息

数据流向:数据库 → Backend write_run_context() → JSON 文件 → 容器 /workspace/in/run-context.json

为什么用文件传递初始上下文? 容器启动前 Backend 无法直接与容器通信。run-context.json 用于将完整的会话历史和上下文“搬运“进容器,使 agentcore 启动后立刻拥有完整的上下文。后续的用户消息通过 HTTP 推送到容器的 8081 端口。

2.2 用户上传的附件

用户通过 Helper 上传的文件会被 Backend 复制到工作区的 in/ 目录:

/workspace/in/
  run-context.json      ← 从数据库生成的初始上下文
  report.pdf            ← 用户上传
  data.csv              ← 用户上传

2.3 Agent 包文件

开发者在 Agent 包中定义的所有文件(提示词模板、工具脚本、配置文件等),通过 ConfigMap 挂载到容器内的 /workspace/pkg/ 目录:

/workspace/pkg/
  system-prompt.md    ← 系统提示词
  tools.json          ← 工具声明
  helper-script.py    ← 辅助脚本

2.4 API 密钥与凭据

大模型 API 密钥等敏感信息通过 Secret 以环境变量形式注入,不会写入文件系统:

OPENAI_API_KEY=sk-xxx
ANTHROPIC_API_KEY=sk-ant-xxx

凭据来源优先级:本次请求参数 > RuntimeProfile 模板 > 进程环境变量。

2.5 后续用户消息(协议推送,异步)

Worker 就绪后,后续用户消息不再写入文件,而是由 Backend 通过协议直接推送给 Worker。

{
  "type": "push_message",
  "id": "msg-001",
  "timestamp": "2026-04-17T02:30:00Z",
  "payload": {
    "runId": "019xxx",
    "sessionId": "019xxx",
    "content": "请帮我分析这个数据集",
    "artifacts": ["data.csv"]
  }
}

语义:这是异步操作。Backend 负责把消息送达 Worker,但真正的推理和结果返回稍后发生。

2.6 会话状态(多轮场景)

如果不是第一轮运行,Worker 会从两个地方获得连续性信息:

  1. run-context.json 初始快照:由 Backend 查库后生成
  2. 协议化状态请求:Worker 运行时按需向 Backend 请求更完整的 checkpoint 或业务数据

这意味着容器不再把“数据库文件”当作主要状态来源。权威状态仍在 Backend 管理的数据库中。

2.7 运行时数据请求(同步)

当 Worker 在执行过程中需要更多数据时,会通过协议向 Backend 发起同步请求:

sequenceDiagram
    participant W as Worker
    participant B as Backend
    participant DB as 数据库

    W->>B: backend_query / checkpoint_get
    B->>DB: 查询
    DB-->>B: 返回结果
    B-->>W: backend_query_result / checkpoint_get_result

语义:这是同步操作。因为 Worker 必须等到结果,才能继续后续推理或恢复。


3. 输出:结果如何返回

agentcore 通过 WebSocket(网络套接字)双向通道 将结果实时推送给 Backend;需要持久化的 checkpoint 也通过协议发送给 Backend,由 Backend 统一写入数据库。

3.1 WebSocket 实时输出

agentcore 启动后主动连接 Backend 的 WebSocket 端点 ws://backend/ws/workers/{run_id},所有输出事件通过同一条持久连接推送:

sequenceDiagram
    participant A as agentcore
    participant B as Backend
    participant DB as 数据库
    participant H as Helper

    rect rgb(230, 255, 230)
    Note over A,H: 流式输出
    loop 推理过程中
        A->>B: WS stream_chunk
        B->>H: WS 转发 token 片段
        H->>H: 实时渲染
    end
    end

    rect rgb(230, 245, 255)
    Note over A,H: 最终结果
    A->>B: WS message_response
    Note over A,B: 携带完整回复内容 + 渲染格式
    B->>DB: 追加到 session.history
    B->>H: WS 推送完成通知
    end

    rect rgb(255, 245, 230)
    Note over A,B: 持久化
    A->>B: checkpoint_put
    B->>DB: 写入 checkpoint 数据库
    B-->>A: checkpoint_put_ack
    end

3.2 事件类型

事件类型说明典型内容
message_responseagentcore 的最终回答完整文本 + 渲染格式(Markdown/代码/表格等)
stream_chunk流式推理片段token 级文本片段
ask_user向用户提问问题文本 + 可选选项列表
tool_call工具调用通知工具名、参数、执行结果
state_change状态变更Worker 生命周期状态(BUSY/IDLE/WAITING_USER)
error运行出错错误类型 + 错误信息

3.3 消息格式

{
  "runId": "019xxx",
  "sessionId": "019xxx",
  "workerName": "agent-worker-abc123",
  "eventType": "message_response",
  "payload": {
    "content": "根据数据分析,销售额环比增长了 15%...",
    "format": "markdown",
    "isComplete": true
  },
  "timestamp": "2025-01-15T10:30:00Z"
}

与文件输出的区别:agentcore 的响应直接通过 WebSocket 发送给 Backend,不再写入 final-answer.md 等文件。这使得:

  • 支持多种渲染格式(Markdown、代码块、表格、图表等),前端可以根据 format 字段选择渲染方式
  • 支持流式推送,用户无需等待完整结果(WebSocket 帧开销仅 2-10 字节,比 HTTP 请求头 300+ 字节大幅降低)
  • 支持中间交互,agentcore 可以随时发送 ask_user 事件

3.4 Checkpoint 持久化(同步写入 Backend)

Worker 不直接把 checkpoint 写到数据库文件,而是通过协议把 checkpoint 快照发送给 Backend:

{
  "type": "checkpoint_put",
  "id": "ckp-001",
  "timestamp": "2026-04-17T02:30:05Z",
  "payload": {
    "sessionId": "019xxx",
    "runId": "019xxx",
    "snapshot": {
      "historyCursor": 12,
      "toolState": {},
      "memoryState": {}
    }
  }
}

随后 Backend:

  1. 校验该 checkpoint 是否属于当前 Session / Run
  2. 写入 Backend 管理的数据库
  3. 返回 checkpoint_put_ack
  4. 更新 SessionRuntimeState

语义:这是同步操作。Worker 只有在收到 ACK 后,才能认为这份状态已经可恢复。


4. 完整输入输出汇总

graph TB
    subgraph INPUT["输入(进入 Worker)"]
        RC["run-context.json<br/>(从数据库生成)"]
        ATTACH["用户附件<br/>(上传或挂载)"]
        PKG["Agent 包文件<br/>(ConfigMap 挂载)"]
        KEYS["API 密钥<br/>(Secret 环境变量)"]
        MSG["后续消息<br/>(WebSocket 推送)"]
    end

    subgraph OUTPUT["输出(从 Worker 返回)"]
        CB_OUT["WebSocket 事件<br/>(结果 + 流式 + 交互)"]
        CKP["Checkpoint 协议写回<br/>(Backend 持久化)"]
        LOGS["日志/本地缓存<br/>(可选本地文件)"]
    end

    RC --> EXEC[agentcore]
    ATTACH --> EXEC
    PKG --> EXEC
    KEYS --> EXEC
    MSG --> EXEC
    EXEC --> CB_OUT
    EXEC --> CKP
    EXEC --> LOGS

5. LaunchPlan 的完整结构

LaunchPlan(启动计划)是 Backend 传递给 K3s 编排器的核心数据结构:

LaunchPlan
├── identity
│   ├── run_id: UUID
│   ├── session_id: UUID
│   ├── user_id: String
│   └── package_name: String
├── runtime
│   ├── image: String              // 容器镜像地址
│   ├── install_strategy: enum     // 运行时安装策略
│   ├── execution_mode: enum       // 执行模式
│   ├── requests/limits            // CPU、内存、磁盘资源
│   └── family: RuntimeFamily      // 运行时系列(Codex/ClaudeCode)
├── package_spec
│   ├── files: Map<String,String>  // Agent 包文件映射
│   └── verify_args: Vec<String>   // 验证命令参数
├── paths
│   ├── workspace_host_path        // 本机工作区根目录
│   ├── session_state_host_path    // 本机会话状态目录
│   ├── checkpoint_mount_path      // 容器内 checkpoint 路径
│   └── mount_grant: Option        // 用户授权挂载
├── strategy
│   ├── volume_backend: HostPath   // 卷后端(本地部署使用 HostPath)
│   ├── runtime_image_mode         // 镜像获取模式
│   └── checkpoint_store_mode      // Checkpoint 存储模式
├── interaction
│   ├── worker_event_channel       // 事件通道(WebSocket / HTTP 回调 / 队列消费)
│   └── worker_idle_policy         // 空闲回收策略
├── services
│   ├── provider_env: Map          // 大模型 API 密钥
│   └── external_service_bindings  // 外部服务绑定(数据库等)
└── cluster
    ├── namespace: String          // K8s 命名空间
    ├── service_account: String    // 服务账号
    └── apply_to_cluster: bool     // 是否真正提交

6. 渲染后的 K8s 资源示例

一次真实运行会产生以下 K8s 资源:

ConfigMap(存储 Agent 包文件)

apiVersion: v1
kind: ConfigMap
metadata:
  name: agent-pkg-abc123
data:
  system-prompt.md: |
    你是一个数据分析助手...
  tools.json: |
    [{"name": "python_exec", ...}]

Secret(存储凭据)

apiVersion: v1
kind: Secret
metadata:
  name: agent-env-abc123
stringData:
  OPENAI_API_KEY: sk-xxx

Deployment + Service

apiVersion: apps/v1
kind: Deployment
metadata:
  name: agent-worker-abc123
  ownerReferences: []           # 级联删除:删 Deployment 自动清理所有资源
spec:
  replicas: 1
  template:
    spec:
      terminationGracePeriodSeconds: 30
      containers:
      - name: worker
        image: registry.example.com/agentcore:latest
        ports:
        - containerPort: 8081
          name: control
          protocol: TCP
        readinessProbe:
          exec:
            command: ["bash", "-lc", "test -f /tmp/agent-store-worker.ready"]
          initialDelaySeconds: 2
          periodSeconds: 5
          failureThreshold: 6
        livenessProbe:
          exec:
            command: ["bash", "-lc", "test -f /tmp/agent-store-worker.ready"]
          initialDelaySeconds: 10
          periodSeconds: 15
          failureThreshold: 3
        env:
        - name: AGENT_WORKLOAD_KIND
          value: session-worker
        - name: AGENT_WORKER_EVENT_CHANNEL_KIND
          value: websocket
        - name: AGENT_WORKER_WS_URL
          value: ws://host.k3s.internal:PORT/ws/workers/RUN_ID
        envFrom:
        - secretRef:
            name: agent-env-abc123
        volumeMounts:
        - name: workspace
          mountPath: /workspace
        - name: pkg
          mountPath: /workspace/pkg
        securityContext:
          runAsNonRoot: true
          allowPrivilegeEscalation: false
          capabilities:
            drop: ["ALL"]
---
apiVersion: v1
kind: Service
metadata:
  name: agent-worker-svc-abc123
spec:
  type: ClusterIP
  ports:
  - port: 8081
    targetPort: control
    protocol: TCP
  selector:
    app: agent-worker-abc123

下一步

了解了输入输出后,请继续阅读 文件挂载机制 了解容器如何访问本机文件。