上周团队在规划 Agent 的多渠道接入方案。有人说"每个 IM 写一套 adapter",有人说"统一用 Webhook 接收然后标准化"。
我打开 Hermes Agent 的代码仓库,gateway/platforms/ 目录下躺着 18 个平台适配器——从 Telegram、Discord 到钉钉、飞书、企业微信、QQ 机器人,甚至还有 iMessage(BlueBubbles)、Signal 和 Home Assistant。
“所有平台共享同一个 Agent Loop,同一套 Session 管理,同一套工具调用。”
他们问:“那 18 个 adapter 之间代码复用率有多少?”
我给他们看了一张图。
整体架构:一个 Agent,多个嘴巴
Hermes Agent 的 Gateway 架构可以抽象为一个四层模型:
┌──────────────────────────────────────────────────────────────────────┐
│ Layer 1: Platform Adapters (gateway/platforms/*.py) │
│ ┌─────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │Telegram │ │Discord │ │钉钉 │ │飞书 │ │WeCom │ ... ×18 │
│ │Bot API │ │Bot API │ │Stream │ │Callback│ │Callback│ │
│ └────┬────┘ └───┬────┘ └───┬────┘ └───┬────┘ └───┬────┘ │
│ │ │ │ │ │ │
│ └───────────┴──────────┴──────────┴──────────┘ │
│ │ │
│ MessageEvent (标准化消息) │
│ │ │
├────────────────────────┼────────────────────────────────────────────┤
│ Layer 2: Gateway Runner (gateway/run.py) │
│ ┌──────────────────────┐ ┌──────────────────────┐ │
│ │ SessionStore │ │ DeliveryRouter │ │
│ │ - 会话生命周期 │ │ - 消息路由 │ │
│ │ - 会话重置策略 │ │ - 跨平台投递 │ │
│ │ - AIAgent 缓存 │ │ - origin/local/显式 │ │
│ └──────────┬───────────┘ └───────────┬───────────┘ │
│ │ │ │
├─────────────┼──────────────────────────┼────────────────────────────┤
│ Layer 3: AIAgent (run_agent.py) │ │
│ ┌─────────────────────────────────┐ │ │
│ │ run_conversation() │ │ │
│ │ - LLM API 调用 │ │ │
│ │ - 工具发现与执行 │ │ │
│ │ - 上下文压缩 │ │ │
│ │ - 回调通知 → Layer 2 │ │ │
│ └─────────────────────────────────┘ │ │
│ │ │
├───────────────────────────────────────┼────────────────────────────┤
│ Layer 4: Tools & Infrastructure │ │
│ terminal / web_search / file / MCP │ ← 所有平台共享 │
│ delegate_task / cronjob / skill │ │
└───────────────────────────────────────┴────────────────────────────┘
核心思想:Layer 1 负责"翻译"——把各平台原生消息格式翻译成统一的 MessageEvent,把 Agent 输出翻译成平台原生 API 调用。Layer 2-4 完全不知道消息来自哪里,它们只看到一个标准化的对话请求。
Layer 1:平台适配器基类
所有 18 个适配器都继承自同一个抽象基类 BasePlatformAdapter:
class BasePlatformAdapter(ABC):
"""所有平台适配器的基类。
子类实现平台特定的连接、认证、收发消息逻辑。
"""
def __init__(self, config: PlatformConfig, platform: Platform):
self.config = config
self.platform = platform
self._message_handler = None # 消息处理回调(指向 GatewayRunner)
self._busy_session_handler = None # 会话忙碌时的处理回调
self._active_sessions = {} # {session_key: asyncio.Event()}
self._pending_messages = {} # 等待处理的消息队列
self._background_tasks = set() # 后台处理任务
# generated by hugo AI
每个子类只需要实现以下几个方法:
| 方法 | 职责 | 示例 |
|---|---|---|
connect() | 连接平台 API | Telegram: aiogram Bot polling |
disconnect() | 断开连接 | 关闭 polling / webhook |
send() | 发送消息 | 调用平台 sendMessage API |
send_typing() | 发送"正在输入"状态 | Telegram: sendChatAction |
send_image() | 发送图片 | 上传并发送文件 |
send_voice() | 发送语音 | TTS 音频发送 |
send_document() | 发送文件 | 文档/代码块发送 |
这就是复用率高的秘诀——基类已经实现了消息处理、会话管理、重试、流式输出、打字指示器、图片缓存等所有通用逻辑,子类只需要"填空"实现平台特定的 API 调用。
消息标准化:MessageEvent
每个平台收到原始消息后,统一包装成 MessageEvent:
class MessageType(Enum):
TEXT = "text"
PHOTO = "photo"
VOICE = "voice"
VIDEO = "video"
DOCUMENT = "document"
AUDIO = "audio"
STICKER = "sticker"
CONTACT = "contact"
LOCATION = "location"
@dataclass
class MessageEvent:
"""标准化消息事件——所有平台统一格式"""
source: SessionSource # 来源:平台、chat_id、user_id、线程 ID
text: str # 消息文本
message_type: MessageType # 消息类型
message_id: str # 平台消息 ID(用于回复)
user_id: str # 发送者 ID
user_name: Optional[str] # 发送者名称
media_urls: List[str] # 媒体文件 URL
media_types: List[str] # 媒体 MIME 类型
reply_to_message_id: Optional[str] # 回复的消息 ID
metadata: Dict[str, Any] # 平台扩展字段
def is_command(self) -> bool:
"""是否是斜杠命令"""
return self.text.startswith('/')
def get_command(self) -> Optional[str]:
"""提取命令名:/model → model"""
if not self.is_command():
return None
return self.text.split()[0][1:].split('@')[0]
def get_command_args(self) -> str:
"""提取命令参数:/model claude-sonnet → claude-sonnet"""
# generated by hugo AI
无论消息来自 Telegram 的 Message 对象、Discord 的 on_message 事件、还是钉钉的 Stream 回调,最终都归一化为这个 MessageEvent。这就是为什么 Gateway Runner 不需要关心"消息从哪来"。
Layer 2:Gateway Runner——大脑
GatewayRunner 是整个 Gateway 的控制中心,负责:
1. 适配器生命周期管理
class GatewayRunner:
def __init__(self, config: GatewayConfig):
self.config = config
self.adapters: Dict[Platform, BasePlatformAdapter] = {}
self.session_store = SessionStore(...)
self.delivery_router = DeliveryRouter(self.config)
self._running_agents: Dict[str, AIAgent] = {} # 会话 → Agent
self._agent_cache: OrderedDict = OrderedDict() # Agent 缓存
# generated by hugo AI
启动时,GatewayRunner 遍历配置中启用的平台,动态加载对应的适配器:
async def start_all_platforms(self):
for platform in self.config.get_connected_platforms():
adapter = await self._create_adapter(platform)
adapter.set_message_handler(self._handle_message)
adapter.set_busy_session_handler(self._handle_busy_session)
adapter.set_session_store(self.session_store)
await adapter.connect()
self.adapters[platform] = adapter
# generated by hugo AI
2. Session Key——会话身份
这是 Gateway 最精妙的设计之一。不同平台的"会话"概念差异巨大:
- DM(私聊):一对一对话,每个用户一个会话
- Group(群聊):多人共享,默认每用户隔离会话
- Thread(线程):Discord/Slack 的线程,默认共享会话
- Forum Topic:Telegram 论坛话题,类似 thread
Session Key 是一个确定性的字符串,格式为:
agent:main:{platform}:{chat_type}:{chat_id}[:{user_id}]
具体规则由 build_session_key() 统一控制:
def build_session_key(source, group_sessions_per_user=True, thread_sessions_per_user=False):
platform = source.platform.value
if source.chat_type == "dm":
# 私聊:每个 DM 独立会话
return f"agent:main:{platform}:dm:{source.chat_id}"
# 群聊:默认按用户隔离
if group_sessions_per_user and source.user_id:
return f"agent:main:{platform}:group:{source.chat_id}:{source.user_id}"
# 线程:默认共享(所有参与者看到同一上下文)
if not thread_sessions_per_user:
return f"agent:main:{platform}:thread:{source.chat_id}:{source.thread_id}"
# 线程按用户隔离
return f"agent:main:{platform}:thread:{source.chat_id}:{source.thread_id}:{source.user_id}"
# generated by hugo AI
这个设计解决了一个实际问题:同一个群里,不同用户和 Agent 对话,上下文不串。同时,Discord 线程里的所有参与者看到的是同一段对话。
3. 消息处理流水线
MessageEvent 到达
│
▼
adapter.handle_message(event)
│
├─ 是斜杠命令?→ 命令旁路(/new, /stop, /approve 等)
│ └─ 不走 Agent Loop,直接由 GatewayRunner 处理
│
├─ 会话正在运行?
│ ├─ 是 → 消息进入 _pending_messages(队列)
│ │ 触发 interrupt(通知 Agent 有新消息)
│ │
│ └─ 否 → 创建新会话
│
▼
_process_message_background(event, session_key)
│
├─ 1. 预处理:图片缓存、STT 语音转文字、文档下载
├─ 2. 获取或创建 AIAgent(从缓存中取,保持 prompt caching)
├─ 3. 构建会话上下文(SessionSource → 系统提示注入)
├─ 4. 加载历史消息(从 SessionStore)
├─ 5. 调用 AIAgent.run_conversation()
├─ 6. Agent 输出通过回调 → adapter.send() 回平台
└─ 7. 清理会话标记、处理 pending messages
4. Agent 缓存:保持 Prompt Caching
这是 Gateway 和 CLI 的关键区别——Gateway 是长寿命进程,可能同时服务成百上千个会话。如果每条消息都新建 AIAgent 实例:
- Prompt Caching 全部失效——Anthropic/Claude 的 prefix caching 要求系统提示和历史消息的前缀不变
- 内存泄漏——每个 Agent 持有 LLM client、tool schemas、memory providers
- 启动延迟——每次初始化都要重新发现工具、加载 skills
解决方案:按 session_key 缓存 AIAgent 实例:
# OrderedDict 实现 LRU 淘汰
self._agent_cache: OrderedDict[str, tuple] = OrderedDict()
def _get_or_create_agent(self, session_key, ...):
if session_key in self._agent_cache:
# 缓存命中——保留 prompt caching
agent, signature = self._agent_cache[session_key]
self._agent_cache.move_to_end(session_key) # LRU 更新
return agent
# 缓存未命中——新建
agent = AIAgent(...)
self._agent_cache[session_key] = (agent, signature)
# 超过容量?淘汰最老的
if len(self._agent_cache) > _AGENT_CACHE_MAX_SIZE:
oldest_key, _ = self._agent_cache.popitem(last=False)
return agent
# generated by hugo AI
默认容量 128,空闲 TTL 1 小时。超出容量的 Agent 会被安全清理(关闭连接、保存状态)。
5. 中断机制
和 CLI 一样,Gateway 支持运行中打断。但实现方式不同:
# BasePlatformAdapter.handle_message()
async def handle_message(self, event: MessageEvent):
session_key = build_session_key(event.source)
if session_key in self._active_sessions:
# 会话正在运行——新消息触发中断
self._pending_messages[session_key] = event
self._active_sessions[session_key].set() # 触发 Event
return
# 会话空闲——创建新任务
self._active_sessions[session_key] = asyncio.Event()
task = asyncio.create_task(self._process_message_background(event, session_key))
# generated by hugo AI
关键设计:_active_sessions 是一个 asyncio.Event 字典。当新消息到达时,set() 触发 Event,后台处理任务检测到信号后安全退出(保存进度、清理状态),然后新的消息接管处理。
Layer 3:AIAgent——共享大脑
无论消息来自哪个平台,最终都汇聚到同一个 AIAgent.run_conversation()。这个函数不关心:
- 消息是 Telegram 用户发的还是钉钉群里来的
- 用户是用中文还是英文提问
- 附件是图片、语音还是文档
它只接收:
result = agent.run_conversation(
user_message=processed_text, # 处理后的文本
conversation_history=history, # 该 session 的历史
stream_callback=stream_cb, # 流式输出回调
task_id=session_key, # 会话标识
)
# generated by hugo AI
Agent 内部的回调(stream_callback、status_callback、tool_progress_callback)由 Gateway Runner 注入,把输出路由回正确的平台:
def _make_stream_callback(adapter, event):
"""为流式输出创建平台特定的回调"""
async def callback(delta: str):
# 累积 delta,按间隔编辑消息
await adapter.send_typing(event.source.chat_id)
# ... 流式更新逻辑
return callback
# generated by hugo AI
消息回流:Delivery Router
Agent 的输出不一定发回原处。Gateway 的 DeliveryRouter 支持灵活的路由:
# 投递目标格式:
# "origin" → 回消息来源
# "local" → 只保存本地文件
# "telegram" → Telegram 的 home channel
# "telegram:123456" → 指定的 Telegram 群
# "dingtalk:#dev" → 钉钉的 #dev 频道
class DeliveryTarget:
@classmethod
def parse(cls, target: str, origin: SessionSource):
if target == "origin":
return cls(platform=origin.platform, chat_id=origin.chat_id)
if target == "local":
return cls(platform=Platform.LOCAL)
if ":" in target:
# platform:chat_id 或 platform:chat_id:thread_id
parts = target.split(":", 2)
platform = Platform(parts[0])
return cls(platform=platform, chat_id=parts[1])
return cls(platform=Platform(target)) # 平台名 → home channel
# generated by hugo AI
这在 Cron 任务中特别有用——一个定时任务可以在服务器上运行,但输出投递到多个频道:
# Cron job 配置
deliver: ["origin", "dingtalk:#ops-team", "local"]
# generated by hugo AI
流式输出:跨平台的渐进式更新
IM 平台通常不支持"打字机效果",但 Gateway 实现了类似的体验:
# StreamingConfig
enabled: bool = True
transport: str = "edit" # Telegram: editMessageText
edit_interval: float = 1.0 # 每秒更新一次(Telegram 限速)
buffer_threshold: int = 40 # 积累 40 字符才发
cursor: str = " ▉" # 流式光标
# generated by hugo AI
Token 1 → buffer: "H" → 不足 40,等待
Token 2 → buffer: "He" → 等待
...
Token 40 → buffer: "Hello, 我是 Hermes Agent ▉" → 发送 editMessageText
Token 41 → buffer: "!" → 等待
Token 80 → buffer: "! 有什么可以帮你的 ▉" → 发送 editMessageText
...
完成 → 移除光标,最终编辑
不同平台的速率限制不同——Telegram 限制每秒一次消息编辑,Discord 可以连续发多条消息。BasePlatformAdapter 通过 _send_with_retry 统一处理重试和速率限制。
媒体处理流水线
IM 平台收到的图片和语音是临时 URL(Telegram 文件 URL 约 1 小时过期),需要本地缓存:
用户发送图片 (Telegram Photo)
│
▼
adapter 下载 → cache_image_from_url()
│ ├─ SSRF 防护(is_safe_url)
│ ├─ 重试 + 指数退避
│ └─ 保存到 ~/.hermes/cache/images/
│
▼
构建文本占位符: "[User sent an image: /path/to/cache/img_abc123.jpg]"
│
▼
AIAgent.run_conversation() 收到文本
│
▼
vision_analyze 工具自动识别图片 → 生成描述
│
▼
Agent 回复 → adapter.send_image() 发回平台
语音消息类似——下载 → 缓存 → STT 转文字 → Agent 处理。如果用户开启了 /voice 模式,Agent 的回复还会经过 TTS 转为语音发回。
会话生命周期管理
Gateway 的会话不是永恒的——它有一套完整生命周期:
创建 → 活跃 → (可选: 重置) → 归档
触发重置的条件:
├─ 用户主动: /new, /reset
├─ 每日重置: 凌晨 4 点自动清理
├─ 空闲超时: 24 小时无活动
└─ 上下文溢出: 压缩后仍然超限
@dataclass
class SessionResetPolicy:
mode: str = "both" # "daily", "idle", "both", "none"
at_hour: int = 4 # 每日重置时间点
idle_minutes: int = 1440 # 24 小时空闲重置
notify: bool = True # 重置时通知用户
# generated by hugo AI
策略可以按平台、会话类型分别配置。比如:
# 群聊会话每天重置,DM 永不重置
session_reset:
mode: "both"
at_hour: 4
idle_minutes: 1440
# 钉钉 DM 会话永不重置
reset_by_type:
dm:
mode: "none"
安全机制
Gateway 作为公网暴露的服务,安全是头等大事:
1. SSRF 防护
# 下载媒体文件前检查 URL 安全性
from tools.url_safety import is_safe_url
if not is_safe_url(url):
raise ValueError(f"Blocked unsafe URL (SSRF protection)")
# 重定向也要检查——防止 302 跳转到内网
async with httpx.AsyncClient(
follow_redirects=True,
event_hooks={"response": [_ssrf_redirect_guard]},
) as client:
response = await client.get(url)
# generated by hugo AI
2. DM 配对机制
未经授权的 DM 消息会被自动处理:
unauthorized_dm_behavior: str = "pair" # "pair" or "ignore"
# "pair" 模式:用户发送 /pair <code> 进行配对
# "ignore" 模式:直接忽略未授权消息
# generated by hugo AI
配对码通过 PairingStore 管理,确保只有授权用户才能与 Agent 对话。
3. UTF-16 长度计算
Telegram 的消息长度限制是 4096 UTF-16 code units,不是 Python 的 len():
def utf16_len(s: str) -> int:
"""计算 UTF-16 code units 数量"""
return len(s.encode("utf-16-le")) // 2
def truncate_message(text: str, limit: int = 4000) -> str:
"""安全截断,不切坏 surrogate pair"""
if utf16_len(text) <= limit:
return text
# 二分查找最长安全前缀
lo, hi = 0, len(text)
while lo < hi:
mid = (lo + hi + 1) // 2
if utf16_len(text[:mid]) <= limit:
lo = mid
else:
hi = mid - 1
return text[:lo]
# generated by hugo AI
这个细节很多开源项目都踩过坑——Python 的 len("😀") 返回 1,但 Telegram 认为它是 2 个 code unit。
Hook 系统:扩展点
Gateway 内置了一套事件钩子系统,允许在不修改核心代码的情况下扩展行为:
# HookRegistry 管理所有钩子
class HookRegistry:
def register(self, event_name: str, handler: Callable):
...
async def emit(self, event_name: str, *args, **kwargs):
for handler in self._handlers.get(event_name, []):
await handler(*args, **kwargs)
# 内置钩子:
# - message.received: 消息到达时
# - message.sent: 消息发出时
# - agent.start: Agent 开始处理时
# - agent.complete: Agent 完成时
# - session.reset: 会话重置时
# generated by hugo AI
比如 boot_md 钩子在 Gateway 启动时自动生成 ~/.hermes/boot.md 文件,记录当前运行状态,方便重启后恢复上下文。
架构决策总结
| 决策 | 选择 | 原因 |
|---|---|---|
| 通信模式 | 长连接(Bot polling/Stream) | 实时性好,适合 IM 场景 |
| 适配器模式 | 基类 + 子类继承 | 90% 逻辑复用,各平台只需实现 API 调用 |
| 消息格式 | 统一 MessageEvent | Agent 不需要知道平台细节 |
| 会话管理 | 确定性 Session Key | 可重现、可调试、支持重置 |
| Agent 实例 | 缓存 + LRU 淘汰 | 保持 Prompt Caching,控制内存 |
| 流式输出 | editMessageText 渐进更新 | 适应 IM 平台限制 |
| 媒体处理 | 本地缓存 + 过期清理 | 解决临时 URL 过期问题 |
| 安全 | SSRF 防护 + DM 配对 | 公网暴露必须有边界 |
一句话总结
Gateway 的本质是协议翻译器——18 个平台的差异被完全隔离在 Adapter 层,Agent 只看到一个标准化的对话流。新平台接入只需要实现 6 个方法(connect/send/send_typing/send_image/send_voice/disconnect),剩下的会话管理、Agent 缓存、流式输出、安全机制全部由基类和 Runner 提供。
你在实际项目中接入过几个 IM 平台?最大的坑是什么?欢迎留言讨论。