前言

在构建 LLM 服务网关时,流式响应(SSE)已经成为标配功能。用户可以实时看到 AI 的生成过程,获得更好的交互体验。然而,流式响应也带来了新的挑战:如何优雅地处理用户中断?如何在网络断开时正确清理资源?如何保证中断时已生成的内容不丢失?

本文将分享 LinkLLM 项目中对话中断流程的设计与实现,介绍如何构建一个企业级的流式对话中断机制。

问题背景

流式对话的中断场景

在实际生产环境中,流式对话面临多种中断场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
┌─────────────────────────────────────────────────────────────┐
│ 流式对话中断场景 │
├─────────────────────────────────────────────────────────────┤
│ 场景1: 用户主动中断 │
│ 用户点击停止按钮 → 需要立即停止生成 │
│ │
│ 场景2: 网络超时/断开 │
│ 网络中断 → 需要检测并清理资源 │
│ │
│ 场景3: 服务端异常 │
│ LLM API 错误 → 需要优雅降级 │
│ │
│ 场景4: 超时控制 │
│ 生成超时 → 需要超时熔断 │
└─────────────────────────────────────────────────────────────┘

现有系统的问题

在实现中断功能之前,我们的系统存在以下问题:

问题 影响 严重程度
无法响应中断 用户点击停止无效果 严重
客户端断开未检测 资源泄漏、持续计费 严重
内容丢失 数据不完整 中等
LLM 调用未取消 不必要的 API 费用 严重

架构设计

整体架构

我们采用分层架构,将中断处理逻辑与业务逻辑分离:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
┌─────────────────────────────────────────────────────────────┐
│ 分层架构图 │
├─────────────────────────────────────────────────────────────┤
│ Presentation Layer (表示层) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ ChatRouter │ │ AbortRouter │ │ SSEHandler │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
├───────────┼────────────────┼────────────────┼───────────────┤
│ Application Layer (应用层) │
│ ┌─────────────────────────────────────────────────┐ │
│ │ StreamOrchestrator │ │
│ └────────────────────────┬────────────────────────┘ │
│ ┌─────────────┐ ┌───────▼───────┐ ┌─────────────┐ │
│ │AbortManager │ │ContentBuffer │ │AbortStrategy│ │
│ └─────────────┘ └───────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Domain Layer (领域层) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │StreamSession│ │ AbortEvent │ │StreamContext│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘

核心设计模式

策略模式 用于不同类型的中断行为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class AbortStrategy(ABC):
@abstractmethod
def should_stop(self, session: StreamSession) -> bool:
pass

class ImmediateAbortStrategy(AbortStrategy):
"""立即中断 - 响应最快"""
def should_stop(self, session) -> bool:
return session.abort_requested

class TimeoutAbortStrategy(AbortStrategy):
"""超时中断 - 资源保护"""
def should_stop(self, session) -> bool:
return session.elapsed_seconds >= session.timeout

观察者模式 用于中断事件通知:

1
2
3
4
5
6
7
class AbortEventPublisher:
def __init__(self):
self._listeners: List[AbortListener] = []

async def notify(self, event: AbortEvent):
for listener in self._listeners:
await listener.on_abort(event)

核心实现

AbortManager - 中断状态管理器

AbortManager 是整个中断系统的核心,采用单例模式管理所有活跃的流式会话:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class AbortManager:
"""
中断状态管理器 - 单例模式

线程安全的流式会话管理,支持:
1. 注册/注销流式会话
2. 响应中断请求
3. 查询会话状态
4. 管理内容缓冲
"""

_instance: Optional[AbortManager] = None
_initialized: bool = False

def __new__(cls) -> AbortManager:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance

def __init__(self) -> None:
if AbortManager._initialized:
return
AbortManager._initialized = True
self._sessions: dict[str, StreamSession] = {}
self._lock = asyncio.Lock()
self._max_sessions: int = 10000

关键方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
async def register(
self,
session_id: str,
user_id: str,
task: asyncio.Task,
timeout: int = 60,
) -> StreamSession:
"""注册新的流式会话"""
async with self._lock:
# 如果已有活跃会话,先取消
if session_id in self._sessions:
old_session = self._sessions[session_id]
if old_session.is_active:
old_session.task.cancel()

session = StreamSession(
session_id=session_id,
user_id=user_id,
task=task,
status=StreamStatus.PENDING,
timeout=timeout,
)
self._sessions[session_id] = session
return session

def should_abort(self, session_id: str) -> bool:
"""
检查是否应该中断(非阻塞)

关键点:这是"快速检查"方法
- 不加锁,确保 O(1) 时间复杂度
- 供流式生成循环频繁调用(每个 chunk)
- 目标响应时间 < 200ms
"""
session = self._sessions.get(session_id)
if session is None:
return True # 会话不存在,应该停止
return session.abort_requested

StreamSession - 流式会话状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@dataclass
class StreamSession:
"""流式会话状态"""
session_id: str
user_id: str
task: asyncio.Task
status: StreamStatus = StreamStatus.PENDING
start_time: datetime = field(default_factory=datetime.now)
abort_requested: bool = False
abort_reason: Optional[AbortReason] = None
content_buffer: ContentBuffer = field(default_factory=ContentBuffer)
timeout: int = 60

@property
def elapsed_seconds(self) -> float:
"""已运行时间(秒)"""
return (datetime.now() - self.start_time).total_seconds()

@property
def is_active(self) -> bool:
"""是否处于活跃状态"""
return self.status in (StreamStatus.PENDING, StreamStatus.GENERATING)

状态机设计

流式会话的状态流转遵循严格的状态机:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
                      register()


┌──────────────┐
│ PENDING │
└──────┬───────┘

stream_start()


┌─────────┌──────────────┐─────────┐
│ │ GENERATING │ │
│ └──────┬───────┘ │
│ │ │
error │ abort_requested stream_done
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ ERROR │ │ ABORTING │ │ COMPLETED │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ timeout│ │
│ ▼ │
│ ┌──────────────┐ │
└────────►│ TIMEOUT │◄───────┘
└──────┬───────┘

cleanup()

支持中断的流式对话服务

核心实现是 chat_stream 方法,在每个 chunk 处理时检查中断标志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
async def chat_stream(
self,
session_id: str,
user_id: str,
message: str,
timeout: int = 60,
) -> AsyncGenerator[str, None]:
"""
支持中断的流式对话

核心流程:
1. 注册会话到中断管理器
2. 循环中检查中断/超时标志
3. 处理中断时保存已生成内容
4. finally 块确保资源清理
"""
current_task = asyncio.current_task()

# 1. 注册会话
stream_session = await abort_manager.register(
session_id=session_id,
user_id=user_id,
task=current_task,
timeout=timeout,
)
abort_manager.mark_generating(session_id)
start_time = datetime.now()

try:
async for chunk in runnable.astream(...):
elapsed = (datetime.now() - start_time).total_seconds()

# 2. 检查中断标志
if abort_manager.should_abort(session_id):
logger.info(f"检测到中断请求: {session_id}")
yield self._make_abort_event(session_id)
break

# 3. 检查超时
if elapsed >= timeout:
logger.warning(f"生成超时: {session_id}")
abort_manager.mark_timeout(session_id)
yield self._make_timeout_event(session_id, elapsed, timeout)
break

# 4. 超时预警 (90%)
if elapsed >= timeout * 0.9 and not warning_sent:
warning_sent = True
yield TimeoutWarningEvent(...).to_sse()

# 5. 处理 chunk
abort_manager.append_content(session_id, chunk.content)
yield create_text_event(chunk.content)

except asyncio.CancelledError:
logger.info(f"Stream cancelled: {session_id}")
abort_manager.mark_aborted(session_id)
yield AbortEvent(reason="cancelled").to_sse()
raise

finally:
# 6. 保存已生成内容(关键!)
await self._save_partial_content(session_id)
# 7. 清理会话
await abort_manager.unregister(session_id)

内容保存策略

中断时保存已生成内容是关键需求,我们通过 finally 块确保执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
async def _save_partial_content(self, session_id: str) -> None:
"""
保存部分生成的内容

关键点:
- 无论正常完成还是中断,都会保存
- 标记 is_partial 区分完整和部分内容
"""
session = abort_manager.get_session(session_id)
if not session:
return

buffer = abort_manager.get_content_buffer(session_id)
if buffer.is_empty:
return

is_partial = session.status in (
StreamStatus.ABORTED,
StreamStatus.TIMEOUT,
StreamStatus.ERROR
)

self.repo.create_message(
session_id=session_id,
role="ai",
content=buffer.content,
token_count=buffer.token_count,
thinking_content=buffer.thinking if buffer.thinking else None,
is_partial=is_partial,
)

API 设计

REST API 接口

方法 路径 描述
POST /api/v1/chat/completions 对话补全(支持流式)
POST /api/v1/chat/abort 中断对话
GET /api/v1/chat/stream/status 查询流式状态
POST /api/v1/chat/stream/cancel 批量取消(管理员)

中断对话接口

1
2
3
4
5
6
7
8
POST /api/v1/chat/abort
Content-Type: application/json
Authorization: Bearer {token}

{
"session_id": "abc123",
"save_content": true
}

响应:

1
2
3
4
5
6
7
8
{
"success": true,
"message": "中断请求已发送",
"session_id": "abc123",
"content_saved": true,
"partial_content_length": 150,
"elapsed_seconds": 12.5
}

SSE 事件类型

事件类型 描述 示例
thinking AI 思考过程 {"type":"thinking","data":"..."}
text 正式回复内容 {"type":"text","data":"你好"}
abort 用户中断 {"type":"abort","reason":"user_request"}
timeout 超时中断 {"type":"timeout","elapsed_seconds":60}
timeout_warning 超时预警 {"type":"timeout_warning","remaining_seconds":6}
error 错误事件 {"type":"error","error_code":"LLM_API_ERROR"}
close 生成完成 {"type":"close","msg":"生成完成"}
heartbeat 心跳保活 {"type":"heartbeat","msg":"ping"}

中断时序图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Client                 Server                 AbortManager     LLM
│ │ │ │
│ POST /completions │ │ │
│ stream=true │ │ │
│─────────────────────►│ │ │
│ │ register(session) │ │
│ │────────────────────────►│ │
│ SSE: text events │ │ │
│◄─────────────────────│◄────────────────────────│◄───────────│
│ │ │ │
│ POST /abort │ │ │
│─────────────────────►│ │ │
│ │ request_abort() │ │
│ │────────────────────────►│ │
│ │ abort_requested=true │ │
│ │◄────────────────────────│ │
│ SSE: abort event │ │ │
│◄─────────────────────│◄────────────────────────│ │
│ │ unregister(session) │ │
│ │────────────────────────►│ │

最佳实践总结

中断响应优化

1
2
3
4
5
# should_abort() 方法不加锁,直接读取内存标志位
# 时间复杂度: O(1),响应延迟: < 1ms
def should_abort(self, session_id: str) -> bool:
session = self._sessions.get(session_id)
return session.abort_requested if session else True

资源清理保证

1
2
3
4
5
6
7
8
9
10
# 正确做法:使用 finally 确保清理
try:
async for chunk in runnable.astream(...):
if should_abort():
yield abort_event
break
yield chunk_event
finally:
await save_content() # 无论如何都保存
await cleanup_session() # 无论如何都清理

性能指标

操作 时间复杂度 锁竞争 建议
should_abort() O(1) 每个chunk调用
append_content() O(1) 每个chunk调用
request_abort() O(1) 独立HTTP请求
register() O(1) 会话开始时

后续优化方向

  1. 心跳检测:添加 HeartbeatMonitor 主动检测客户端断开
  2. 集群支持:实现 RedisAbortManager 支持分布式部署
  3. 统计指标:添加 Prometheus 指标导出
  4. 重试机制:LLM API 错误时自动重试

总结

本文介绍了 LinkLLM Server 中对话中断流程的设计与实现。通过 AbortManager 统一管理流式会话状态,结合状态机模式控制会话生命周期,使用 finally 块确保资源清理和内容保存,实现了响应时间 < 200ms 的中断机制。

核心设计要点:

  • 单例模式管理全局会话状态
  • 非阻塞检查实现快速中断响应
  • finally 块保证资源清理和内容保存
  • 状态机严格控制状态转换

这套方案已经在生产环境中稳定运行,有效解决了流式对话的中断问题。