Durable Run 作业化迁移方案
Table of Contents
Summary
把当前“HTTP 请求内跑 agent + SSE 仅负责附着输出”的模型,改成“后台 durable job 驱动执行,HTTP 只负责创建 run、附着流、暂停/恢复/注入消息”。
目标效果:
浏览器断开后,run 继续执行,不依赖原始请求协程存活。
任意客户端可通过 run_id 重新附着同一个执行中的 run。
服务重启后,未完成 run 可被 worker 扫描并恢复或标记失败。
当前 ProjectDraft / ProjectTurn / WorkspaceRunState / SSE buffer 继续保留,但职责更清晰。
默认选择:先做“队列作业化 durable run”,不直接引入外部 workflow 引擎。
Key Changes
1. 抽出 Run 领域模型,停止让 chat.py 直接承载执行状态机
新增一个明确的 ChatRun 持久化对象,职责是描述一次 agent 执行本身,而不是复用 SSE buffer 或 turn payload 隐式表达。
建议新增表:
chat_runs
核心字段:
- run_id
- project_id
- session_id
- draft_id
- turn_id
- status: queued | acquiring_lock | restoring_context | running | waiting_for_user | paused | publishing | completed | failed | cancelled
- worker_id
- lease_expires_at
- continuation_of_run_id
- last_checkpoint_seq
- last_error
- created_at / started_at / updated_at / ended_at
保留现有 ProjectTurn 作为对话与产物视图,但不要再让它兼任运行时真相来源。 ProjectTurn.assistant_message_json.resume_event_id 继续用于前端展示恢复位置,但运行恢复以 chat_runs 为准。
2. 把执行循环搬到独立 worker,不再依赖 HTTP StreamingResponse
新增后台 worker 模块,例如:
apps/api/services/chat_run_worker.py
apps/api/services/chat_run_executor.py
apps/api/services/chat_run_queue.py
职责拆分:
API 路由:
- 创建 run 记录
- 入队
- 立即返回 run_id
- SSE/轮询仅做附着和消费事件
Worker:
- 抢占 run lease
- 加载上下文
- 执行 agent loop
- 持续写 checkpoint、事件流、turn/draft 状态
- 处理暂停、用户回答、proposal apply/reject、enqueue 消息
Queue:
- 可先用 Redis list / stream + DB lease
- 不要求首版引入 Celery / Temporal / Inngest
执行入口从“POST /chat 直接跑”改为:
- POST /projects/{id}/chat 创建 chat_run
- 记录 ProjectTurn(status=running) 与 draft
- 将 run_id 推入队列
- 返回 run_id
- 前端立刻连 /chat/stream/{run_id} 或沿用现有 /chat/resume/{run_id}
3. 定义可恢复 checkpoint,恢复执行而不只是恢复 SSE
当前已有:
WorkspaceRunState
ProjectDraft.partial_response_json
model_messages_json
RunEventBuffer
但这些还不足以在进程死掉后真正恢复执行。 新增 RunCheckpoint 持久化结构,按“工具边界”保存,而不是按 token 级别保存。
建议 checkpoint 内容:
run_id
seq
phase
message_history_json
assistant_steps_json
tool_calls_json
task_plan_json
workspace_run_state_json
pending_user_interrupt_json
active_tool_name
continuation_prompt
resume_from_checkpoint_kind
checkpoint 触发点:
每次 tool result 完成后
每次进入 waiting_for_user
每次完成 queued message 注入后
publish 前
publish 完成后
不要尝试在任意 token 中间恢复。 恢复语义限定为:从最近一个“已完成 tool-return 的稳定边界”继续,这和你们现有 _can_resume_from_tool_checkpoint() 思路一致,但现在要落到持久化 checkpoint,而不是只在单次请求重试里使用。
4. 重新定义 API:创建 run、附着流、控制 run
建议把接口语义改成 run-first:
POST /projects/{project_id}/chat-runs
- 创建新 run 或 continuation run
- 返回 run_id, stream_url, status
GET /projects/{project_id}/chat-runs/{run_id}/stream
- SSE 附着到运行事件
- 内部仍可复用现有 RunEventBuffer
GET /projects/{project_id}/chat-runs/{run_id}
- 查询 run 状态、当前阶段、waiting reason、saved_files
POST /projects/{project_id}/chat-runs/{run_id}/pause
POST /projects/{project_id}/chat-runs/{run_id}/resume
- 对 paused 或 waiting_for_user run 生效
POST /projects/{project_id}/chat-runs/{run_id}/input
- 统一承载 queued user message / question answer / proposal decision
- 不再分散为 continuation 请求参数和多个分支语义
现有 /chat/resume/{chat_run_id} 可以短期兼容,内部转发到新的 run stream。
5. 让 worker 具备 lease 和 crash recovery
要点:
worker 抢 run 时更新 worker_id + lease_expires_at
执行期间周期性续租
如果 worker 崩溃,lease 过期后其他 worker 可接管
接管逻辑:
- 读取最新 checkpoint
- 恢复 sandbox 连接或按 draft manifest 重建
- 从 checkpoint 指定的 continuation point 继续
如果接管时发现 checkpoint 不可恢复:
- 将 run 标记 failed
- 写 terminal 事件
- 不让 run 永远挂在 running
这一步是 durable run 的核心。 没有 lease,就只是“后台任务”;有 lease + checkpoint,才是“可恢复执行”。
6. 把 sandbox 生命周期从“请求 keepalive”改成“worker keepalive”
当前 keepalive 在 chat.py 的执行循环里。迁移后改为:
worker 负责 refresh_sandbox_lifecycle
worker 在 lease 续租时顺带刷新 sandbox TTL
run 附着的 SSE 客户端不再承担任何 keepalive 责任
恢复时的 sandbox 策略:
优先 reconnect 当前 session.e2b_sandbox_id
失效时按 ProjectDraft.code_manifest_key 恢复 workspace
再执行 reconcile_sandbox_runtime
只有在 checkpoint 标记“已进入 publishing 且未完成”时,才允许重试 publish;否则只恢复到 pre-publish 状态
7. 收敛当前 chat.py 的职责
apps/api/routers/chat.py 需要逐步瘦身到三类逻辑:
request validation / auth
run creation / control / stream attach
legacy compatibility glue
从中迁出的逻辑:
agent loop
queue interrupt 注入
finalize workspace
checkpoint flush
detached drain
worker keepalive
这样 chat.py 不再是 runtime 内核,只是 transport adapter。
Test Plan
必须覆盖以下场景:
- 创建 run 后立即断开 SSE,worker 继续执行并最终完成。
- 两个客户端同时附着同一 run_id,都能看到一致事件流。
- worker 进程在 tool result 后崩溃,run 被新 worker 从最近 checkpoint 接管并继续。
- worker 在 text streaming 中崩溃,恢复后从最近稳定 checkpoint 继续,不重复执行已完成工具。
- waiting_for_user 状态下服务重启,用户提交 answer 后 run 继续。
- proposal apply / reject 在 run-first API 下仍能正确更新资源并继续执行。
- sandbox 已被回收时,run 接管流程能用 draft manifest 恢复。
- publish 过程中失败时,不产生重复版本,也不丢失 draft。
- pause 请求到达后,run 最终进入 paused,后续 resume 可继续。
- 同一 project 同时只允许一个持有 lock 的活跃 run,冲突时返回明确错误。
Assumptions
- 首版继续使用 Redis + Postgres,不引入 Temporal / Celery / Inngest。
- 恢复粒度限定为“tool checkpoint 级”,不追求 token 级精确续跑。
- RunEventBuffer 继续保留,职责是“事件回放与多端附着”,不是运行时真相。
- ProjectDraft 继续作为 workspace 草稿真相来源;ChatRun 只表示执行状态。
- 首版只支持单 run 单 project lock,不做同项目多并发 agent run。