前言
在构建 LLM 服务网关时,流式响应(SSE)已经成为标配功能。用户可以实时看到 AI 的生成过程,获得更好的交互体验。然而,流式响应也带来了新的挑战:如何优雅地处理用户中断?如何在网络断开时正确清理资源?如何保证中断时已生成的内容不丢失?
本文将分享 LinkLLM 项目中对话中断流程的设计与实现,介绍如何构建一个企业级的流式对话中断机制。
问题背景
流式对话的中断场景
在实际生产环境中,流式对话面临多种中断场景:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| ┌─────────────────────────────────────────────────────────────┐ │ 流式对话中断场景 │ ├─────────────────────────────────────────────────────────────┤ │ 场景1: 用户主动中断 │ │ 用户点击停止按钮 → 需要立即停止生成 │ │ │ │ 场景2: 网络超时/断开 │ │ 网络中断 → 需要检测并清理资源 │ │ │ │ 场景3: 服务端异常 │ │ LLM API 错误 → 需要优雅降级 │ │ │ │ 场景4: 超时控制 │ │ 生成超时 → 需要超时熔断 │ └─────────────────────────────────────────────────────────────┘
|
现有系统的问题
在实现中断功能之前,我们的系统存在以下问题:
| 问题 |
影响 |
严重程度 |
| 无法响应中断 |
用户点击停止无效果 |
严重 |
| 客户端断开未检测 |
资源泄漏、持续计费 |
严重 |
| 内容丢失 |
数据不完整 |
中等 |
| LLM 调用未取消 |
不必要的 API 费用 |
严重 |
架构设计
整体架构
我们采用分层架构,将中断处理逻辑与业务逻辑分离:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| ┌─────────────────────────────────────────────────────────────┐ │ 分层架构图 │ ├─────────────────────────────────────────────────────────────┤ │ Presentation Layer (表示层) │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ ChatRouter │ │ AbortRouter │ │ SSEHandler │ │ │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ ├───────────┼────────────────┼────────────────┼───────────────┤ │ Application Layer (应用层) │ │ ┌─────────────────────────────────────────────────┐ │ │ │ StreamOrchestrator │ │ │ └────────────────────────┬────────────────────────┘ │ │ ┌─────────────┐ ┌───────▼───────┐ ┌─────────────┐ │ │ │AbortManager │ │ContentBuffer │ │AbortStrategy│ │ │ └─────────────┘ └───────────────┘ └─────────────┘ │ ├─────────────────────────────────────────────────────────────┤ │ Domain Layer (领域层) │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │StreamSession│ │ AbortEvent │ │StreamContext│ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────────────────────────┘
|
核心设计模式
策略模式 用于不同类型的中断行为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| class AbortStrategy(ABC): @abstractmethod def should_stop(self, session: StreamSession) -> bool: pass
class ImmediateAbortStrategy(AbortStrategy): """立即中断 - 响应最快""" def should_stop(self, session) -> bool: return session.abort_requested
class TimeoutAbortStrategy(AbortStrategy): """超时中断 - 资源保护""" def should_stop(self, session) -> bool: return session.elapsed_seconds >= session.timeout
|
观察者模式 用于中断事件通知:
1 2 3 4 5 6 7
| class AbortEventPublisher: def __init__(self): self._listeners: List[AbortListener] = []
async def notify(self, event: AbortEvent): for listener in self._listeners: await listener.on_abort(event)
|
核心实现
AbortManager - 中断状态管理器
AbortManager 是整个中断系统的核心,采用单例模式管理所有活跃的流式会话:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| class AbortManager: """ 中断状态管理器 - 单例模式
线程安全的流式会话管理,支持: 1. 注册/注销流式会话 2. 响应中断请求 3. 查询会话状态 4. 管理内容缓冲 """
_instance: Optional[AbortManager] = None _initialized: bool = False
def __new__(cls) -> AbortManager: if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance
def __init__(self) -> None: if AbortManager._initialized: return AbortManager._initialized = True self._sessions: dict[str, StreamSession] = {} self._lock = asyncio.Lock() self._max_sessions: int = 10000
|
关键方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| async def register( self, session_id: str, user_id: str, task: asyncio.Task, timeout: int = 60, ) -> StreamSession: """注册新的流式会话""" async with self._lock: if session_id in self._sessions: old_session = self._sessions[session_id] if old_session.is_active: old_session.task.cancel()
session = StreamSession( session_id=session_id, user_id=user_id, task=task, status=StreamStatus.PENDING, timeout=timeout, ) self._sessions[session_id] = session return session
def should_abort(self, session_id: str) -> bool: """ 检查是否应该中断(非阻塞)
关键点:这是"快速检查"方法 - 不加锁,确保 O(1) 时间复杂度 - 供流式生成循环频繁调用(每个 chunk) - 目标响应时间 < 200ms """ session = self._sessions.get(session_id) if session is None: return True return session.abort_requested
|
StreamSession - 流式会话状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @dataclass class StreamSession: """流式会话状态""" session_id: str user_id: str task: asyncio.Task status: StreamStatus = StreamStatus.PENDING start_time: datetime = field(default_factory=datetime.now) abort_requested: bool = False abort_reason: Optional[AbortReason] = None content_buffer: ContentBuffer = field(default_factory=ContentBuffer) timeout: int = 60
@property def elapsed_seconds(self) -> float: """已运行时间(秒)""" return (datetime.now() - self.start_time).total_seconds()
@property def is_active(self) -> bool: """是否处于活跃状态""" return self.status in (StreamStatus.PENDING, StreamStatus.GENERATING)
|
状态机设计
流式会话的状态流转遵循严格的状态机:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| register() │ ▼ ┌──────────────┐ │ PENDING │ └──────┬───────┘ │ stream_start() │ ▼ ┌─────────┌──────────────┐─────────┐ │ │ GENERATING │ │ │ └──────┬───────┘ │ │ │ │ error │ abort_requested stream_done │ │ │ ▼ ▼ ▼ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ ERROR │ │ ABORTING │ │ COMPLETED │ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ timeout│ │ │ ▼ │ │ ┌──────────────┐ │ └────────►│ TIMEOUT │◄───────┘ └──────┬───────┘ │ cleanup()
|
支持中断的流式对话服务
核心实现是 chat_stream 方法,在每个 chunk 处理时检查中断标志:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| async def chat_stream( self, session_id: str, user_id: str, message: str, timeout: int = 60, ) -> AsyncGenerator[str, None]: """ 支持中断的流式对话
核心流程: 1. 注册会话到中断管理器 2. 循环中检查中断/超时标志 3. 处理中断时保存已生成内容 4. finally 块确保资源清理 """ current_task = asyncio.current_task()
stream_session = await abort_manager.register( session_id=session_id, user_id=user_id, task=current_task, timeout=timeout, ) abort_manager.mark_generating(session_id) start_time = datetime.now()
try: async for chunk in runnable.astream(...): elapsed = (datetime.now() - start_time).total_seconds()
if abort_manager.should_abort(session_id): logger.info(f"检测到中断请求: {session_id}") yield self._make_abort_event(session_id) break
if elapsed >= timeout: logger.warning(f"生成超时: {session_id}") abort_manager.mark_timeout(session_id) yield self._make_timeout_event(session_id, elapsed, timeout) break
if elapsed >= timeout * 0.9 and not warning_sent: warning_sent = True yield TimeoutWarningEvent(...).to_sse()
abort_manager.append_content(session_id, chunk.content) yield create_text_event(chunk.content)
except asyncio.CancelledError: logger.info(f"Stream cancelled: {session_id}") abort_manager.mark_aborted(session_id) yield AbortEvent(reason="cancelled").to_sse() raise
finally: await self._save_partial_content(session_id) await abort_manager.unregister(session_id)
|
内容保存策略
中断时保存已生成内容是关键需求,我们通过 finally 块确保执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| async def _save_partial_content(self, session_id: str) -> None: """ 保存部分生成的内容
关键点: - 无论正常完成还是中断,都会保存 - 标记 is_partial 区分完整和部分内容 """ session = abort_manager.get_session(session_id) if not session: return
buffer = abort_manager.get_content_buffer(session_id) if buffer.is_empty: return
is_partial = session.status in ( StreamStatus.ABORTED, StreamStatus.TIMEOUT, StreamStatus.ERROR )
self.repo.create_message( session_id=session_id, role="ai", content=buffer.content, token_count=buffer.token_count, thinking_content=buffer.thinking if buffer.thinking else None, is_partial=is_partial, )
|
API 设计
REST API 接口
| 方法 |
路径 |
描述 |
| POST |
/api/v1/chat/completions |
对话补全(支持流式) |
| POST |
/api/v1/chat/abort |
中断对话 |
| GET |
/api/v1/chat/stream/status |
查询流式状态 |
| POST |
/api/v1/chat/stream/cancel |
批量取消(管理员) |
中断对话接口
1 2 3 4 5 6 7 8
| POST /api/v1/chat/abort Content-Type: application/json Authorization: Bearer {token}
{ "session_id": "abc123", "save_content": true }
|
响应:
1 2 3 4 5 6 7 8
| { "success": true, "message": "中断请求已发送", "session_id": "abc123", "content_saved": true, "partial_content_length": 150, "elapsed_seconds": 12.5 }
|
SSE 事件类型
| 事件类型 |
描述 |
示例 |
thinking |
AI 思考过程 |
{"type":"thinking","data":"..."} |
text |
正式回复内容 |
{"type":"text","data":"你好"} |
abort |
用户中断 |
{"type":"abort","reason":"user_request"} |
timeout |
超时中断 |
{"type":"timeout","elapsed_seconds":60} |
timeout_warning |
超时预警 |
{"type":"timeout_warning","remaining_seconds":6} |
error |
错误事件 |
{"type":"error","error_code":"LLM_API_ERROR"} |
close |
生成完成 |
{"type":"close","msg":"生成完成"} |
heartbeat |
心跳保活 |
{"type":"heartbeat","msg":"ping"} |
中断时序图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| Client Server AbortManager LLM │ │ │ │ │ POST /completions │ │ │ │ stream=true │ │ │ │─────────────────────►│ │ │ │ │ register(session) │ │ │ │────────────────────────►│ │ │ SSE: text events │ │ │ │◄─────────────────────│◄────────────────────────│◄───────────│ │ │ │ │ │ POST /abort │ │ │ │─────────────────────►│ │ │ │ │ request_abort() │ │ │ │────────────────────────►│ │ │ │ abort_requested=true │ │ │ │◄────────────────────────│ │ │ SSE: abort event │ │ │ │◄─────────────────────│◄────────────────────────│ │ │ │ unregister(session) │ │ │ │────────────────────────►│ │
|
最佳实践总结
中断响应优化
1 2 3 4 5
|
def should_abort(self, session_id: str) -> bool: session = self._sessions.get(session_id) return session.abort_requested if session else True
|
资源清理保证
1 2 3 4 5 6 7 8 9 10
| try: async for chunk in runnable.astream(...): if should_abort(): yield abort_event break yield chunk_event finally: await save_content() await cleanup_session()
|
性能指标
| 操作 |
时间复杂度 |
锁竞争 |
建议 |
should_abort() |
O(1) |
无 |
每个chunk调用 |
append_content() |
O(1) |
无 |
每个chunk调用 |
request_abort() |
O(1) |
有 |
独立HTTP请求 |
register() |
O(1) |
有 |
会话开始时 |
后续优化方向
- 心跳检测:添加 HeartbeatMonitor 主动检测客户端断开
- 集群支持:实现 RedisAbortManager 支持分布式部署
- 统计指标:添加 Prometheus 指标导出
- 重试机制:LLM API 错误时自动重试
总结
本文介绍了 LinkLLM Server 中对话中断流程的设计与实现。通过 AbortManager 统一管理流式会话状态,结合状态机模式控制会话生命周期,使用 finally 块确保资源清理和内容保存,实现了响应时间 < 200ms 的中断机制。
核心设计要点:
- 单例模式管理全局会话状态
- 非阻塞检查实现快速中断响应
- finally 块保证资源清理和内容保存
- 状态机严格控制状态转换
这套方案已经在生产环境中稳定运行,有效解决了流式对话的中断问题。